This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5e4674ab669f28c1b1292625719083ec44a5b153 Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Fri Jan 26 15:58:43 2024 +0800 [fix](partial update) mishandling of exceptions in the publish phase may result in data loss (#30366) --- be/src/olap/tablet.cpp | 44 +++++- be/src/olap/tablet.h | 6 +- be/src/olap/txn_manager.cpp | 63 +------- be/src/olap/txn_manager.h | 40 ++++- .../fault_injection_p0/concurrency_update1.csv | 21 +++ .../fault_injection_p0/concurrency_update2.csv | 21 +++ .../fault_injection_p0/concurrency_update3.csv | 21 +++ ..._partial_update_publish_conflict_with_error.out | 47 ++++++ ...rtial_update_publish_conflict_with_error.groovy | 171 +++++++++++++++++++++ 9 files changed, 362 insertions(+), 72 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 2af47a5b430..19d598d1275 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3272,16 +3272,28 @@ Status Tablet::commit_phase_update_delete_bitmap( return Status::OK(); } -Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, - const RowsetIdUnorderedSet& pre_rowset_ids, - DeleteBitmapPtr delete_bitmap, int64_t txn_id, - RowsetWriter* rowset_writer) { +Status Tablet::update_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id) { SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency); RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; RowsetIdUnorderedSet rowset_ids_to_del; + RowsetSharedPtr rowset = txn_info->rowset; int64_t cur_version = rowset->start_version(); + std::unique_ptr<RowsetWriter> rowset_writer; + RETURN_IF_ERROR( + create_transient_rowset_writer(rowset, &rowset_writer, txn_info->partial_update_info)); + + DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap; + // Partial update might generate new segments when there is conflicts while publish, and mark + // the same key in original segments as delete. + // When the new segment flush fails or the rowset build fails, the deletion marker for the + // duplicate key of the original segment should not remain in `txn_info->delete_bitmap`, + // so we need to make a copy of `txn_info->delete_bitmap` and make changes on it. + if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) { + delete_bitmap = std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap)); + } + OlapStopWatch watch; std::vector<segment_v2::SegmentSharedPtr> segments; RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments)); @@ -3299,7 +3311,8 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, } auto t2 = watch.get_elapse_time_us(); - _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); + _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids, &rowset_ids_to_add, + &rowset_ids_to_del); for (const auto& to_del : rowset_ids_to_del) { delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); } @@ -3313,7 +3326,7 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, auto token = _engine.calc_delete_bitmap_executor()->create_token(); RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, - cur_version - 1, token.get(), rowset_writer)); + cur_version - 1, token.get(), rowset_writer.get())); RETURN_IF_ERROR(token->wait()); std::stringstream ss; @@ -3345,6 +3358,25 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, _remove_sentinel_mark_from_delete_bitmap(delete_bitmap); } + if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) { + DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail", { + if (rand() % 100 < (100 * dp->param("percent", 0.5))) { + LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random failed") + .tag("txn_id", txn_id); + return Status::InternalError( + "debug update_delete_bitmap partial update write rowset random failed"); + } + }); + // build rowset writer and merge transient rowset + RETURN_IF_ERROR(rowset_writer->flush()); + RowsetSharedPtr transient_rowset; + RETURN_IF_ERROR(rowset_writer->build(transient_rowset)); + rowset->merge_rowset_meta(transient_rowset->rowset_meta()); + + // erase segment cache cause we will add a segment to rowset + SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments()); + } + // update version without write lock, compaction and publish_txn // will update delete bitmap, handle compaction with _rowset_update_lock // and publish_txn runs sequential so no need to lock here diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index d6ad0285233..22721a835a3 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -70,6 +70,7 @@ class TupleDescriptor; class CalcDeleteBitmapToken; enum CompressKind : int; class RowsetBinlogMetasPB; +struct TabletTxnInfo; namespace io { class RemoteFileSystem; @@ -471,10 +472,7 @@ public: const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id, CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr); - Status update_delete_bitmap(const RowsetSharedPtr& rowset, - const RowsetIdUnorderedSet& pre_rowset_ids, - DeleteBitmapPtr delete_bitmap, int64_t txn_id, - RowsetWriter* rowset_writer = nullptr); + Status update_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id); void calc_compaction_output_rowset_delete_bitmap( const std::vector<RowsetSharedPtr>& input_rowsets, const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version, diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 7a82ef4c6ae..8e0a2439712 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -63,46 +63,6 @@ using std::vector; namespace doris { using namespace ErrorCode; -struct TabletTxnInfo { - PUniqueId load_id; - RowsetSharedPtr rowset; - PendingRowsetGuard pending_rs_guard; - bool unique_key_merge_on_write {false}; - DeleteBitmapPtr delete_bitmap; - // records rowsets calc in commit txn - RowsetIdUnorderedSet rowset_ids; - int64_t creation_time; - bool ingest {false}; - std::shared_ptr<PartialUpdateInfo> partial_update_info; - TxnState state {TxnState::PREPARED}; - - TabletTxnInfo() = default; - - TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) - : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} - - TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg) - : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {} - - TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write, - DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids) - : load_id(load_id), - rowset(rowset), - unique_key_merge_on_write(merge_on_write), - delete_bitmap(delete_bitmap), - rowset_ids(ids), - creation_time(UnixSeconds()) {} - - void prepare() { state = TxnState::PREPARED; } - void commit() { state = TxnState::COMMITTED; } - void rollback() { state = TxnState::ROLLEDBACK; } - void abort() { - if (state == TxnState::PREPARED) { - state = TxnState::ABORTED; - } - } -}; - TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size) : _engine(engine), _txn_map_shard_size(txn_map_shard_size), @@ -521,33 +481,14 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, }); // update delete_bitmap if (tablet_txn_info->unique_key_merge_on_write) { - std::unique_ptr<RowsetWriter> rowset_writer; - RETURN_IF_ERROR(tablet->create_transient_rowset_writer( - rowset, &rowset_writer, tablet_txn_info->partial_update_info)); - int64_t t2 = MonotonicMicros(); - RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info->rowset_ids, - tablet_txn_info->delete_bitmap, transaction_id, - rowset_writer.get())); + RETURN_IF_ERROR(tablet->update_delete_bitmap(tablet_txn_info.get(), transaction_id)); int64_t t3 = MonotonicMicros(); stats->calc_delete_bitmap_time_us = t3 - t2; - if (tablet_txn_info->partial_update_info && - tablet_txn_info->partial_update_info->is_partial_update) { - // build rowset writer and merge transient rowset - RETURN_IF_ERROR(rowset_writer->flush()); - RowsetSharedPtr transient_rowset; - RETURN_IF_ERROR(rowset_writer->build(transient_rowset)); - rowset->merge_rowset_meta(transient_rowset->rowset_meta()); - - // erase segment cache cause we will add a segment to rowset - SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments()); - } - stats->partial_update_write_segment_us = MonotonicMicros() - t3; - int64_t t4 = MonotonicMicros(); RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap( tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap, version.second)); - stats->save_meta_time_us = MonotonicMicros() - t4; + stats->save_meta_time_us = MonotonicMicros() - t3; } /// Step 3: add to binlog diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index f7e67d0a462..e33958a66aa 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -62,7 +62,45 @@ enum class TxnState { DELETED = 5, }; -struct TabletTxnInfo; +struct TabletTxnInfo { + PUniqueId load_id; + RowsetSharedPtr rowset; + PendingRowsetGuard pending_rs_guard; + bool unique_key_merge_on_write {false}; + DeleteBitmapPtr delete_bitmap; + // records rowsets calc in commit txn + RowsetIdUnorderedSet rowset_ids; + int64_t creation_time; + bool ingest {false}; + std::shared_ptr<PartialUpdateInfo> partial_update_info; + TxnState state {TxnState::PREPARED}; + + TabletTxnInfo() = default; + + TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) + : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} + + TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg) + : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {} + + TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write, + DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids) + : load_id(load_id), + rowset(rowset), + unique_key_merge_on_write(merge_on_write), + delete_bitmap(delete_bitmap), + rowset_ids(ids), + creation_time(UnixSeconds()) {} + + void prepare() { state = TxnState::PREPARED; } + void commit() { state = TxnState::COMMITTED; } + void rollback() { state = TxnState::ROLLEDBACK; } + void abort() { + if (state == TxnState::PREPARED) { + state = TxnState::ABORTED; + } + } +}; struct CommitTabletTxnInfo { TTransactionId transaction_id {0}; diff --git a/regression-test/data/fault_injection_p0/concurrency_update1.csv b/regression-test/data/fault_injection_p0/concurrency_update1.csv new file mode 100644 index 00000000000..5bc6c8de802 --- /dev/null +++ b/regression-test/data/fault_injection_p0/concurrency_update1.csv @@ -0,0 +1,21 @@ +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 diff --git a/regression-test/data/fault_injection_p0/concurrency_update2.csv b/regression-test/data/fault_injection_p0/concurrency_update2.csv new file mode 100644 index 00000000000..23f43edc585 --- /dev/null +++ b/regression-test/data/fault_injection_p0/concurrency_update2.csv @@ -0,0 +1,21 @@ +0,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +1,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +2,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +3,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +4,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +5,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +6,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +7,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +8,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +9,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +10,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +12,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +13,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +14,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +15,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +16,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +17,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +18,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +19,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +20,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/regression-test/data/fault_injection_p0/concurrency_update3.csv b/regression-test/data/fault_injection_p0/concurrency_update3.csv new file mode 100644 index 00000000000..e2dd51a2dcc --- /dev/null +++ b/regression-test/data/fault_injection_p0/concurrency_update3.csv @@ -0,0 +1,21 @@ +0,b0 +1,b1 +2,b2 +3,b3 +4,b4 +5,b5 +6,b6 +7,b7 +8,b8 +9,b9 +10,b10 +11,b11 +12,b12 +13,b13 +14,b14 +15,b15 +16,b16 +17,b17 +18,b18 +19,b19 +20,b20 diff --git a/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out b/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out new file mode 100644 index 00000000000..4e4d3f19f6e --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out @@ -0,0 +1,47 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 \N \N \N \N \N +1 \N \N \N \N \N +10 \N \N \N \N \N +11 \N \N \N \N \N +12 \N \N \N \N \N +13 \N \N \N \N \N +14 \N \N \N \N \N +15 \N \N \N \N \N +16 \N \N \N \N \N +17 \N \N \N \N \N +18 \N \N \N \N \N +19 \N \N \N \N \N +2 \N \N \N \N \N +20 \N \N \N \N \N +3 \N \N \N \N \N +4 \N \N \N \N \N +5 \N \N \N \N \N +6 \N \N \N \N \N +7 \N \N \N \N \N +8 \N \N \N \N \N +9 \N \N \N \N \N + +-- !sql -- +0 aaaaaaaaaa b0 \N \N \N +1 aaaaaaaaaa b1 \N \N \N +10 aaaaaaaaaa b10 \N \N \N +11 aaaaaaaaaa b11 \N \N \N +12 aaaaaaaaaa b12 \N \N \N +13 aaaaaaaaaa b13 \N \N \N +14 aaaaaaaaaa b14 \N \N \N +15 aaaaaaaaaa b15 \N \N \N +16 aaaaaaaaaa b16 \N \N \N +17 aaaaaaaaaa b17 \N \N \N +18 aaaaaaaaaa b18 \N \N \N +19 aaaaaaaaaa b19 \N \N \N +2 aaaaaaaaaa b2 \N \N \N +20 aaaaaaaaaa b20 \N \N \N +3 aaaaaaaaaa b3 \N \N \N +4 aaaaaaaaaa b4 \N \N \N +5 aaaaaaaaaa b5 \N \N \N +6 aaaaaaaaaa b6 \N \N \N +7 aaaaaaaaaa b7 \N \N \N +8 aaaaaaaaaa b8 \N \N \N +9 aaaaaaaaaa b9 \N \N \N + diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy b/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy new file mode 100644 index 00000000000..29b5bcaaed6 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy @@ -0,0 +1,171 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils + +suite("test_partial_update_publish_conflict_with_error", "nonConcurrent") { + def dbName = context.config.getDbNameByFile(context.file) + def tableName = "test_partial_update_publish_conflict_with_error" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + k1 varchar(20) not null, + v1 varchar(20), + v2 varchar(20), + v3 varchar(20), + v4 varchar(20), + v5 varchar(20)) + UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true")""" + + // base data + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', "k1" + + file 'concurrency_update1.csv' + time 10000 // limit inflight 10s + } + sql "sync;" + qt_sql """ select * from ${tableName} order by k1;""" + + // NOTE: use streamload 2pc to construct the conflict of publish + def do_streamload_2pc_commit = { txnId -> + def command = "curl -X PUT --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}" + + " -H txn_id:${txnId}" + + " -H txn_operation:commit" + + " http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + json2pc = parseJson(out) + log.info("http_stream 2pc result: ${out}".toString()) + assertEquals(code, 0) + assertEquals("success", json2pc.status.toLowerCase()) + } + + def wait_for_publish = {txnId, waitSecond -> + String st = "PREPARE" + while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && waitSecond > 0) { + Thread.sleep(1000) + waitSecond -= 1 + def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}" + assertNotNull(result) + st = result[0].TransactionStatus + } + log.info("Stream load with txn ${txnId} is ${st}") + assertEquals(st, "VISIBLE") + } + + GetDebugPoint().clearDebugPointsForAllBEs() + def dbug_point = 'Tablet.update_delete_bitmap.partial_update_write_rowset_fail' + + // concurrent load 1 + String txnId1 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k1,tmp,v1=substr(tmp,1,10)' + set 'strict_mode', "false" + set 'two_phase_commit', 'true' + file 'concurrency_update2.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId1 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + + String txnId2 + // concurrent load 2 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k1,v2' + set 'strict_mode', "false" + set 'two_phase_commit', 'true' + file 'concurrency_update3.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId2 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + sql "sync;" + + // complete load 1 first + do_streamload_2pc_commit(txnId1) + wait_for_publish(txnId1, 10) + + // inject failure on publish + try { + GetDebugPoint().enableDebugPointForAllBEs(dbug_point, [percent : 1.0]) + do_streamload_2pc_commit(txnId2) + sleep(3000) + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().disableDebugPointForAllBEs(dbug_point) + } + // publish will retry until success + // FE retry may take logger time, wait for 20 secs + wait_for_publish(txnId2, 20) + + sql "sync;" + qt_sql """ select * from ${tableName} order by k1;""" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org