github-actions[bot] commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3375024673


##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java:
##########
@@ -1629,13 +1664,87 @@ public OlapTableStreamUpdate getStreamUpdate() {
         Map<Long, Long> prev = Maps.newHashMap();
         Map<Long, Long> next = Maps.newHashMap();
         for (Long partitionId : getSelectedPartitionIds()) {
-            Pair<Long, Long> streamUpdate = ((OlapTableStreamWrapper) 
olapTable).getStreamUpdate(partitionId);
+            Pair<Long, Long> streamUpdate = getStreamUpdate(partitionId);
             if (streamUpdate.first != null) {
-                // prev could be null, ignore
+                // prev could be null, in case of historical scan
                 prev.put(partitionId, streamUpdate.first);
             }
-            next.put(partitionId, streamUpdate.second);
+            if (streamUpdate.second != null) {

Review Comment:
   Because the DUP_KEYS fallback is part of the first branch, any unknown 
`incrementType` on a duplicate-key table is silently accepted and coerced to 
`APPEND_ONLY`. For example, `SELECT * FROM 
dup_tbl@incr("incrementType"="TYPO")` reaches `parseBinlogScanType()`, 
`olapTable.getKeysType() == DUP_KEYS` makes this branch true, and the 
`Unsupported increment type` error is skipped. Please validate the string value 
first, then apply the duplicate-table default/fallback only for an 
omitted/default increment type.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java:
##########
@@ -21,11 +21,13 @@
 import org.apache.doris.analysis.StmtType;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RowBinlogTableWrapper;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.stream.AbstractTableStreamUpdate;
 import org.apache.doris.catalog.stream.OlapTableStreamUpdate;

Review Comment:
   This collects stream scans so `TableStreamUpdateInfo` can be attached to the 
transaction, but that update is only consumed by the non-cloud 
`GlobalTransactionMgr`/`DatabaseTransactionMgr` path. 
`CloudGlobalTransactionMgr.commitAndPublishTransaction()` commits through 
meta-service directly and never checks or applies 
`transactionState.getStreamUpdateInfos()`, so in cloud mode `INSERT INTO target 
SELECT ... FROM stream` can commit the target rows without advancing the stream 
offset; the next consumption will read the same changes again. Please either 
update the cloud commit path to lock/check/apply the stream offsets atomically, 
or reject table-stream consumption in cloud mode until it is supported.



##########
be/src/storage/iterator/block_reader.cpp:
##########
@@ -74,6 +77,302 @@ Status BlockReader::next_block_with_aggregation(Block* 
block, bool* eof) {
     return res;
 }
 
+Status BlockReader::_ensure_binlog_column_pos(const Block& src_block) {
+    if (_binlog_column_pos_inited) {
+        if (_binlog_op_pos >= 0 && _binlog_op_pos < src_block.columns() &&
+            src_block.get_by_position(_binlog_op_pos).name == 
kRowBinlogOpColName) {
+            return Status::OK();
+        }
+        _binlog_op_pos = -1;
+        _binlog_lsn_pos = -1;
+        _binlog_timestamp_pos = -1;
+        _binlog_column_pos_inited = false;
+    }
+
+    const uint32_t col_num = src_block.columns();
+    _before_column_idx.resize(col_num);
+    for (uint32_t i = 0; i < col_num; ++i) {
+        const auto& name = src_block.get_by_position(i).name;
+        if (name == kRowBinlogOpColName) {
+            _binlog_op_pos = static_cast<int>(i);
+        } else if (name == kRowBinlogLsnColName) {
+            _binlog_lsn_pos = static_cast<int>(i);
+        } else if (name == kRowBinlogTimestampColName) {
+            _binlog_timestamp_pos = static_cast<int>(i);
+        } else {
+            std::string before_name = build_before_column_name(name);
+            int tmp_idx = src_block.get_position_by_name(before_name);
+            _before_column_idx[i] = tmp_idx < 0 ? i : tmp_idx;
+        }
+    }
+    _binlog_column_pos_inited = true;
+    return Status::OK();
+}
+
+int64_t BlockReader::_read_binlog_op(const IColumn& col, size_t row) const {
+    const IColumn* cur = &col;
+    if (const auto* nullable = check_and_get_column<ColumnNullable>(*cur)) {
+        if (nullable->is_null_at(row)) {
+            return ROW_BINLOG_UNKNOWN;
+        }
+        cur = &nullable->get_nested_column();
+    }
+
+    if (const auto* int64_col = check_and_get_column<ColumnInt64>(*cur)) {
+        return int64_col->get_element(row);
+    }
+
+    return ROW_BINLOG_UNKNOWN;
+}
+
+Status BlockReader::_write_binlog_op(IColumn& col, int64_t op) const {
+    IColumn* cur = &col;
+    ColumnNullable* nullable = nullptr;
+    if (auto* n = typeid_cast<ColumnNullable*>(cur)) {
+        nullable = n;
+        cur = &nullable->get_nested_column();
+    }
+
+    if (auto* int64_col = typeid_cast<ColumnInt64*>(cur)) {
+        int64_col->insert_value(op);
+    } else {
+        return Status::InternalError("invalid column type");
+    }
+
+    if (nullable != nullptr) {
+        nullable->get_null_map_data().push_back(0);
+    }
+    return Status::OK();
+}
+
+bool BlockReader::_is_binlog_meta_column(int idx) const {
+    return idx == _binlog_op_pos || idx == _binlog_lsn_pos || idx == 
_binlog_timestamp_pos;
+}
+
+int BlockReader::_resolve_source_column_index(int idx, bool use_before) const {
+    if (!use_before || _is_binlog_meta_column(idx)) {
+        return idx;
+    }
+
+    return _before_column_idx[idx];
+}
+
+void BlockReader::_init_pending_row_columns(const Block& block) {
+    if (!_pending_row_columns.empty()) {
+        return;
+    }
+    _pending_row_columns = block.clone_empty_columns();
+}
+
+bool BlockReader::_emit_pending_row(MutableColumns& target_columns, size_t& 
output_row_count) {
+    if (!_has_pending_row) {
+        return false;
+    }
+    for (size_t i = 0; i < _pending_row_columns.size(); ++i) {
+        target_columns[i]->insert_from(*_pending_row_columns[i], 0);
+        _pending_row_columns[i]->clear();
+    }
+    _has_pending_row = false;
+    output_row_count++;
+    return true;
+}
+
+Status BlockReader::_append_change_row(MutableColumns& target_columns, const 
Block& src_block,
+                                       size_t row_pos, int64_t output_op, bool 
use_before) {
+    for (auto idx : _normal_columns_idx) {
+        int target_col_idx = _return_columns_loc[idx];
+        if (target_col_idx < 0) {
+            continue;
+        }
+        if (idx == _binlog_op_pos) {
+            RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx], 
output_op));
+            continue;
+        }
+        int source_idx = _resolve_source_column_index(idx, use_before);
+        
target_columns[target_col_idx]->insert_from(*src_block.get_by_position(source_idx).column,
+                                                    row_pos);
+    }
+    return Status::OK();
+}
+
+Status BlockReader::_min_delta_next_block(Block* block, bool* eof) {
+    if (UNLIKELY(_eof && !_has_pending_row)) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    if (_stored_data_columns.empty()) {
+        _stored_data_columns = _next_row.block->clone_empty_columns();
+    }
+
+    auto target_columns_guard = block->mutate_columns_scoped();
+    auto& target_columns = target_columns_guard.mutable_columns();
+    size_t output_row_count = 0;
+    _init_pending_row_columns(*block);
+    RETURN_IF_ERROR(_ensure_binlog_column_pos(*_next_row.block));
+    while (output_row_count < batch_max_rows()) {
+        if (_emit_pending_row(target_columns, output_row_count)) {
+            continue;
+        }
+        if (_eof) {
+            break;
+        }
+        bool need_pop = _stored_data_columns[0]->size() > 1;
+        for (size_t i = 0; i < _stored_data_columns.size(); ++i) {
+            if (need_pop) {
+                _stored_data_columns[i]->pop_back(1);
+            }
+            
_stored_data_columns[i]->insert_from(*_next_row.block->get_by_position(i).column,
+                                                 _next_row.row_pos);
+        }
+        auto res = _vcollect_iter.next(&_next_row);
+        if (UNLIKELY(res.is<END_OF_FILE>())) {
+            _eof = true;
+        } else if (UNLIKELY(!res.ok())) {
+            return res;
+        }
+
+        if (!_eof && _next_row.is_same) {
+            continue;
+        }
+        size_t group_size = _stored_data_columns[0]->size();
+        auto first_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos], 
0);
+        auto last_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos], 
group_size - 1);
+        auto result = AggregateFunctionMinDelta::calculate_result(first_op, 
last_op);
+        switch (result) {
+        case AggregateFunctionMinDelta::ResultType::SKIP:
+            break;
+        case AggregateFunctionMinDelta::ResultType::INSERT:
+            for (auto idx : _normal_columns_idx) {
+                int target_col_idx = _return_columns_loc[idx];
+                if (idx == _binlog_op_pos) {
+                    
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+                                                     STREAM_CHANGE_INSERT));
+                } else {
+                    
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+                                                                group_size - 
1);
+                }
+            }
+            output_row_count++;
+            break;
+        case AggregateFunctionMinDelta::ResultType::DELETE:
+            for (auto idx : _normal_columns_idx) {
+                int target_col_idx = _return_columns_loc[idx];
+                if (idx == _binlog_op_pos) {
+                    
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+                                                     STREAM_CHANGE_DELETE));
+                } else {
+                    
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+                                                                group_size - 
1);
+                }
+            }
+            output_row_count++;
+            break;
+        case AggregateFunctionMinDelta::ResultType::UPDATE_BEFORE_AFTER:
+            for (auto idx : _normal_columns_idx) {
+                int target_col_idx = _return_columns_loc[idx];
+                if (idx == _binlog_op_pos) {
+                    
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+                                                     
STREAM_CHANGE_UPDATE_BEFORE));
+                } else if (idx == _binlog_lsn_pos) {
+                    
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+                                                                group_size - 
1);
+                } else {
+                    int source_idx = _resolve_source_column_index(idx, true);
+                    
target_columns[target_col_idx]->insert_from(*_stored_data_columns[source_idx],
+                                                                0);
+                }
+            }
+            output_row_count++;
+            if (output_row_count >= batch_max_rows()) {
+                for (auto idx : _normal_columns_idx) {
+                    int target_col_idx = _return_columns_loc[idx];
+                    if (idx == _binlog_op_pos) {
+                        
RETURN_IF_ERROR(_write_binlog_op(*_pending_row_columns[target_col_idx],
+                                                         
STREAM_CHANGE_UPDATE_AFTER));
+                    } else {
+                        _pending_row_columns[target_col_idx]->insert_from(
+                                *_stored_data_columns[idx], group_size - 1);
+                    }
+                }
+                _has_pending_row = true;
+            } else {
+                for (auto idx : _normal_columns_idx) {
+                    int target_col_idx = _return_columns_loc[idx];
+                    if (idx == _binlog_op_pos) {
+                        
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+                                                         
STREAM_CHANGE_UPDATE_AFTER));
+                    } else {
+                        
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+                                                                    group_size 
- 1);
+                    }
+                }
+                output_row_count++;
+            }
+            break;
+        }
+
+        for (auto& col : _stored_data_columns) {
+            col->clear();

Review Comment:
   `_init_collect_iter()` sets `_eof = true` when `current_row(&_next_row)` 
returns `END_OF_FILE`, but a DETAIL scan still installs 
`_detail_change_next_block()`. On the first `next_block_with_aggregation()` for 
an empty row-binlog range, or for a start/end TSO range filtered down to no 
rows, this line dereferences `_next_row.block` before checking `_eof` or 
`_next_row.block == nullptr`, so the query can crash instead of returning EOF. 
Please mirror the `_min_delta_next_block()` guard and return `*eof = true` 
before touching `_next_row.block` when `_eof && !_has_pending_row`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to