This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 50c8563f35 [fix](partial update) fix some bugs of sequence column 
(#21896)
50c8563f35 is described below

commit 50c8563f350926fbd0f5613563e9ab26ed2806c6
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Sat Jul 22 15:26:48 2023 +0800

    [fix](partial update) fix some bugs of sequence column (#21896)
---
 be/src/olap/memtable.cpp                           |  22 +++-
 be/src/olap/memtable.h                             |   1 +
 be/src/olap/rowset/segment_v2/segment_writer.cpp   | 114 ++++++++++++------
 be/src/olap/tablet.cpp                             |   9 +-
 .../update-delete/sequence-column-manual.md        |   6 +
 .../update-delete/sequence-column-manual.md        |   8 +-
 .../apache/doris/analysis/NativeInsertStmt.java    |   4 +
 .../apache/doris/planner/StreamLoadPlanner.java    |   4 +
 .../doris/planner/external/LoadScanProvider.java   |  20 +++-
 .../partial_update/basic_with_seq.csv              |   3 +
 .../partial_update/test_partial_update_seq_col.out |  17 +++
 .../test_partial_update_seq_col.groovy             | 127 +++++++++++++++++++++
 12 files changed, 290 insertions(+), 45 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 04a11fec0a..dc3d9d8be7 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -32,6 +32,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/load_channel_mgr.h"
+#include "tablet_meta.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
 #include "vec/aggregate_functions/aggregate_function_reader.h"
@@ -174,6 +175,20 @@ void MemTable::insert(const vectorized::Block* 
input_block, const std::vector<in
         if (_keys_type != KeysType::DUP_KEYS) {
             _init_agg_functions(&target_block);
         }
+        if (_tablet_schema->has_sequence_col()) {
+            if (_tablet_schema->is_partial_update()) {
+                // for unique key partial update, sequence column index in 
block
+                // may be different with the index in `_tablet_schema`
+                for (size_t i = 0; i < cloneBlock.columns(); i++) {
+                    if (cloneBlock.get_by_position(i).name == SEQUENCE_COL) {
+                        _seq_col_idx_in_block = i;
+                        break;
+                    }
+                }
+            } else {
+                _seq_col_idx_in_block = _tablet_schema->sequence_col_idx();
+            }
+        }
     }
 
     auto num_rows = row_idxs.size();
@@ -197,10 +212,9 @@ void MemTable::insert(const vectorized::Block* 
input_block, const std::vector<in
 
 void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& 
mutable_block,
                                            RowInBlock* src_row, RowInBlock* 
dst_row) {
-    if (_tablet_schema->has_sequence_col()) {
-        auto sequence_idx = _tablet_schema->sequence_col_idx();
-        DCHECK_LT(sequence_idx, mutable_block.columns());
-        auto col_ptr = mutable_block.mutable_columns()[sequence_idx].get();
+    if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) {
+        DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns());
+        auto col_ptr = 
mutable_block.mutable_columns()[_seq_col_idx_in_block].get();
         auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, 
*col_ptr, -1);
         // dst sequence column larger than src, don't need to update
         if (res > 0) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index aac071f8a4..ed9226c4a0 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -259,6 +259,7 @@ private:
     size_t _mem_usage;
 
     size_t _num_columns;
+    int32_t _seq_col_idx_in_block = -1;
 }; // class MemTable
 
 inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index fd518bfd59..2e991353f5 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -339,8 +339,10 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, 
row_pos, num_rows,
                                                                    
including_cids);
 
+    bool have_input_seq_column = false;
     // write including columns
     std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
+    vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
     for (auto cid : including_cids) {
         // olap data convertor alway start from id = 0
         auto converted_result = _olap_data_convertor->convert_column_data(cid);
@@ -349,6 +351,10 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
         }
         if (cid < _num_key_columns) {
             key_columns.push_back(converted_result.second);
+        } else if (_tablet_schema->has_sequence_col() &&
+                   cid == _tablet_schema->sequence_col_idx()) {
+            seq_column = converted_result.second;
+            have_input_seq_column = true;
         }
         
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
                                                      
converted_result.second->get_data(),
@@ -367,42 +373,55 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     // locate rows in base data
 
     int64_t num_rows_filtered = 0;
-    {
-        for (size_t pos = row_pos; pos < num_rows; pos++) {
-            std::string key = _full_encode_keys(key_columns, pos);
+    for (size_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+        std::string key = _full_encode_keys(key_columns, pos);
+        if (have_input_seq_column) {
+            _encode_seq_column(seq_column, pos, &key);
+        }
+        // If the table have sequence column, and the include-cids don't 
contain the sequence
+        // column, we need to update the primary key index builder at the end 
of this method.
+        // At that time, we have a valid sequence column to encode the key 
with seq col.
+        if (!_tablet_schema->has_sequence_col() || have_input_seq_column) {
             RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
-            _maybe_invalid_row_cache(key);
-
-            RowLocation loc;
-            // save rowset shared ptr so this rowset wouldn't delete
-            RowsetSharedPtr rowset;
-            auto st = _tablet->lookup_row_key(key, false, specified_rowsets, 
&loc,
-                                              _mow_context->max_version, 
segment_caches, &rowset);
-            if (st.is<NOT_FOUND>()) {
-                if (_tablet_schema->is_strict_mode()) {
-                    ++num_rows_filtered;
-                    // delete the invalid newly inserted row
-                    
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, 0},
-                                                     pos);
-                }
-
-                if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
-                    return Status::InternalError(
-                            "the unmentioned columns should have default value 
or be nullable for "
-                            "newly inserted rows in non-strict mode partial 
update");
-                }
-                has_default_or_nullable = true;
-                use_default_or_null_flag.emplace_back(true);
-                continue;
+        }
+        _maybe_invalid_row_cache(key);
+
+        RowLocation loc;
+        // save rowset shared ptr so this rowset wouldn't delete
+        RowsetSharedPtr rowset;
+        auto st = _tablet->lookup_row_key(key, have_input_seq_column, 
specified_rowsets, &loc,
+                                          _mow_context->max_version, 
segment_caches, &rowset);
+        if (st.is<NOT_FOUND>()) {
+            if (_tablet_schema->is_strict_mode()) {
+                ++num_rows_filtered;
+                // delete the invalid newly inserted row
+                _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, 
_segment_id, 0},
+                                                 pos);
             }
-            if (!st.ok()) {
-                LOG(WARNING) << "failed to lookup row key, error: " << st;
-                return st;
+
+            if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
+                return Status::InternalError(
+                        "the unmentioned columns should have default value or 
be nullable for "
+                        "newly inserted rows in non-strict mode partial 
update");
             }
-            // partial update should not contain invisible columns
-            use_default_or_null_flag.emplace_back(false);
-            _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
-            _tablet->prepare_to_read(loc, pos, &_rssid_to_rid);
+            has_default_or_nullable = true;
+            use_default_or_null_flag.emplace_back(true);
+            continue;
+        }
+        if (!st.ok() && !st.is<ALREADY_EXIST>()) {
+            LOG(WARNING) << "failed to lookup row key, error: " << st;
+            return st;
+        }
+        // partial update should not contain invisible columns
+        use_default_or_null_flag.emplace_back(false);
+        _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
+        _tablet->prepare_to_read(loc, pos, &_rssid_to_rid);
+
+        if (st.is<ALREADY_EXIST>()) {
+            // although we need to mark delete current row, we still need to 
read missing columns
+            // for this row, we need to ensure that each column is aligned
+            _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, 
_segment_id, 0}, pos);
+        } else {
             _mow_context->delete_bitmap->add({loc.rowset_id, loc.segment_id, 
0}, loc.row_id);
         }
     }
@@ -427,13 +446,42 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
         if (converted_result.first != Status::OK()) {
             return converted_result.first;
         }
+        if (_tablet_schema->has_sequence_col() && !have_input_seq_column &&
+            cid == _tablet_schema->sequence_col_idx()) {
+            DCHECK_EQ(seq_column, nullptr);
+            seq_column = converted_result.second;
+        }
         
RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
                                                      
converted_result.second->get_data(),
                                                      num_rows));
     }
 
     _num_rows_filtered += num_rows_filtered;
+    if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
+        DCHECK_NE(seq_column, nullptr);
+        DCHECK_EQ(_num_rows_written, row_pos)
+                << "_num_rows_written: " << _num_rows_written << ", row_pos" 
<< row_pos;
+        DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
+                << "primary key index builder num rows(" << 
_primary_key_index_builder->num_rows()
+                << ") not equal to segment writer's num rows written(" << 
_num_rows_written << ")";
+        if (_num_rows_written != row_pos ||
+            _primary_key_index_builder->num_rows() != _num_rows_written) {
+            return Status::InternalError(
+                    "Correctness check failed, _num_rows_written: {}, row_pos: 
{}, primary key "
+                    "index builder num rows: {}",
+                    _num_rows_written, row_pos, 
_primary_key_index_builder->num_rows());
+        }
+        for (size_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+            std::string key = _full_encode_keys(key_columns, pos);
+            _encode_seq_column(seq_column, pos, &key);
+            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+        }
+    }
+
     _num_rows_written += num_rows;
+    DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
+            << "primary key index builder num rows(" << 
_primary_key_index_builder->num_rows()
+            << ") not equal to segment writer's num rows written(" << 
_num_rows_written << ")";
     _olap_data_convertor->clear_source_content();
     return Status::OK();
 }
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 885c03f64b..f24c2da689 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2794,11 +2794,11 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, 
bool with_seq_col,
             if (s.is<NOT_FOUND>()) {
                 continue;
             }
-            if (!s.ok()) {
+            if (!s.ok() && !s.is<ALREADY_EXIST>()) {
                 return s;
             }
-            if (_tablet_meta->delete_bitmap().contains_agg_without_cache(
-                        {loc.rowset_id, loc.segment_id, version}, loc.row_id)) 
{
+            if (s.ok() && 
_tablet_meta->delete_bitmap().contains_agg_without_cache(
+                                  {loc.rowset_id, loc.segment_id, version}, 
loc.row_id)) {
                 // if has sequence col, we continue to compare the sequence_id 
of
                 // all rowsets, util we find an existing key.
                 if (_schema->has_sequence_col()) {
@@ -2807,6 +2807,9 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, 
bool with_seq_col,
                 // The key is deleted, we don't need to search for it any more.
                 break;
             }
+            // `st` is either OK or ALREADY_EXIST now.
+            // for partial update, even if the key is already exists, we still 
need to
+            // read it's original values to keep all columns align.
             *row_location = loc;
             if (rowset) {
                 // return it's rowset
diff --git a/docs/en/docs/data-operate/update-delete/sequence-column-manual.md 
b/docs/en/docs/data-operate/update-delete/sequence-column-manual.md
index 9dd91e373c..3517d0d9b9 100644
--- a/docs/en/docs/data-operate/update-delete/sequence-column-manual.md
+++ b/docs/en/docs/data-operate/update-delete/sequence-column-manual.md
@@ -248,3 +248,9 @@ MySQL [test]> select * from test_table;
 ```
 At this point, you can replace the original data in the table. To sum up, the 
sequence column will be compared among all the batches, the largest value of 
the same key will be imported into Doris table.
 
+## Note
+1. To prevent misuse, in load tasks like StreamLoad/BrokerLoad, user must 
specify the sequence column; otherwise, user will receive the following error 
message:
+```
+Table test_tbl has a sequence column, need to specify the sequence column
+```
+2 Starting from version 2.0, Doris supports partial column updates for 
Merge-on-Write implementation on Unique Key tables. In load tasks with partial 
column update, users can update only a subset of columns at a time, so it is 
not mandatory to include the sequence column. If the import task submitted by 
the user includes the sequence column, it will have no effect on the behavior. 
However, if the import task does not include the sequence column, Doris will 
use the matching historical dat [...]
diff --git 
a/docs/zh-CN/docs/data-operate/update-delete/sequence-column-manual.md 
b/docs/zh-CN/docs/data-operate/update-delete/sequence-column-manual.md
index f488bf011d..709a52efdb 100644
--- a/docs/zh-CN/docs/data-operate/update-delete/sequence-column-manual.md
+++ b/docs/zh-CN/docs/data-operate/update-delete/sequence-column-manual.md
@@ -268,5 +268,9 @@ MySQL [test]> select * from test_table;
 
 此时就可以替换表中原有的数据。综上,在导入过程中,会比较所有批次的sequence列值,选择值最大的记录导入Doris表中。
 
-
-
+## 注意
+1. 为防止误用,在StreamLoad/BrokerLoad等导入任务中,必须要指定sequence列,不然会收到以下报错信息:
+```
+Table test_tbl has sequence column, need to specify the sequence column
+```
+2. 自版本2.0起,Doris对Unique 
Key表的Merge-on-Write实现支持了部分列更新能力,在部分列更新导入中,用户每次可以只更新一部分列,因此并不是必须要包含sequence列。若用户提交的导入任务中,包含sequence列,则行为无影响;若用户提交的导入任务不包含sequence列,Doris会使用匹配的历史数据中的sequence列作为更新后该行的sequence列的值。如果历史数据中不存在相同key的列,则会自动用null或默认值填充。
 
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 50a75e68da..2014aa1756 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -371,6 +371,10 @@ public class NativeInsertStmt extends InsertStmt {
                     targetPartitionIds.add(part.getId());
                 }
             }
+            if (isPartialUpdate && olapTable.hasSequenceCol() && 
olapTable.getSequenceMapCol() != null
+                    && 
partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
+                partialUpdateCols.add(Column.SEQUENCE_COL);
+            }
             // need a descriptor
             DescriptorTable descTable = analyzer.getDescTbl();
             olapTuple = descTable.createTupleDescriptor();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 94bcf6ca79..8474703bee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -158,6 +158,10 @@ public class StreamLoadPlanner {
                                 + col.getName());
                         }
                         partialUpdateInputColumns.add(col.getName());
+                        if (destTable.hasSequenceCol() && 
destTable.getSequenceMapCol() != null
+                                && 
destTable.getSequenceMapCol().equalsIgnoreCase(col.getName())) {
+                            partialUpdateInputColumns.add(Column.SEQUENCE_COL);
+                        }
                         existInExpr = true;
                         break;
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index aabe21bf68..87946b5a2c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -52,6 +52,7 @@ import com.google.common.collect.Maps;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class LoadScanProvider {
 
@@ -181,11 +182,24 @@ public class LoadScanProvider {
         TableIf targetTable = getTargetTable();
         if (targetTable instanceof OlapTable && ((OlapTable) 
targetTable).hasSequenceCol()) {
             String sequenceCol = ((OlapTable) targetTable).getSequenceMapCol();
-            if (sequenceCol == null) {
+            if (sequenceCol != null) {
+                String finalSequenceCol = sequenceCol;
+                Optional<ImportColumnDesc> foundCol = 
columnDescs.descs.stream()
+                        .filter(c -> 
c.getColumnName().equalsIgnoreCase(finalSequenceCol)).findAny();
+                // if `columnDescs.descs` is empty, that means it's not a 
partial update load, and user not specify
+                // column name.
+                if (foundCol.isPresent() || columnDescs.descs.isEmpty()) {
+                    columnDescs.descs.add(new 
ImportColumnDesc(Column.SEQUENCE_COL,
+                            new SlotRef(null, sequenceCol)));
+                } else if (!fileGroupInfo.isPartialUpdate()) {
+                    throw new UserException("Table " + targetTable.getName()
+                            + " has sequence column, need to specify the 
sequence column");
+                }
+            } else {
                 sequenceCol = context.fileGroup.getSequenceCol();
+                columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
+                        new SlotRef(null, sequenceCol)));
             }
-            columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
-                    new SlotRef(null, sequenceCol)));
         }
         List<Integer> srcSlotIds = Lists.newArrayList();
         Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, 
context.fileGroup.getColumnToHadoopFunction(),
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/basic_with_seq.csv 
b/regression-test/data/unique_with_mow_p0/partial_update/basic_with_seq.csv
new file mode 100644
index 0000000000..68be488ced
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/partial_update/basic_with_seq.csv
@@ -0,0 +1,3 @@
+2,2500,2023-07-19
+2,2600,2023-07-20
+1,1300,2022-07-19
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_col.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_col.out
new file mode 100644
index 0000000000..8246986dc1
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_col.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+1      doris   1000    123     1       2023-01-01
+2      doris2  2000    223     1       2023-01-01
+
+-- !partial_update_without_seq --
+1      doris   200     123     1       2023-01-01
+2      doris2  400     223     1       2023-01-01
+
+-- !partial_update_with_seq --
+1      doris   200     123     1       2023-01-01
+2      doris2  2600    223     1       2023-07-20
+
+-- !partial_update_with_seq_hidden_columns --
+1      doris   200     123     1       2023-01-01      0       3       
2023-01-01
+2      doris2  2600    223     1       2023-07-20      0       4       
2023-07-20
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_col.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_col.groovy
new file mode 100644
index 0000000000..aeb4cc2732
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_col.groovy
@@ -0,0 +1,127 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_primary_key_partial_update_seq_col", "p0") {
+    def tableName = "test_primary_key_partial_update_seq_col"
+
+    // create table
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NOT NULL COMMENT "用户 ID",
+                `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+                `score` int(11) NOT NULL COMMENT "用户得分",
+                `test` int(11) NULL COMMENT "null test",
+                `dft` int(11) DEFAULT "4321",
+                `update_time` date NULL)
+            UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES(
+                "replication_num" = "1",
+                "enable_unique_key_merge_on_write" = "true",
+                "function_column.sequence_col" = "update_time"
+            )
+    """
+    // insert 2 lines
+    sql """
+        insert into ${tableName} values
+            (2, "doris2", 2000, 223, 1, '2023-01-01'),
+            (1, "doris", 1000, 123, 1, '2023-01-01')
+    """
+
+    sql "sync"
+
+    qt_select_default """
+        select * from ${tableName} order by id;
+    """
+
+    // don't set partial update header, it's a row update streamload
+    // the input data don't contains sequence mapping column, will load fail
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'id,score'
+
+        file 'basic.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertTrue(json.Message.contains('need to specify the sequence 
column'))
+        }
+    }
+
+
+    // set partial update header, should success
+    // we don't provide the sequence column in input data, so the updated rows
+    // should use there original sequence column values.
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'id,score'
+
+        file 'basic.csv'
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+
+    qt_partial_update_without_seq """
+        select * from ${tableName} order by id;
+    """
+
+    // provide the sequence column this time, should update according to the
+    // given sequence values
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'id,score,update_time'
+
+        file 'basic_with_seq.csv'
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+
+    qt_partial_update_with_seq """
+        select * from ${tableName} order by id;
+    """
+
+    sql "SET show_hidden_columns=true"
+
+    sql "sync"
+
+    qt_partial_update_with_seq_hidden_columns """
+        select * from ${tableName} order by id;
+    """
+
+    // drop drop
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to