This is an automated email from the ASF dual-hosted git repository. airborne pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 070914e08e6 [cherry-pick](branch-3.0) add inverted index storage format V2 (#37419) 070914e08e6 is described below commit 070914e08e6752550033206f58a5881dc0f3f900 Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Fri Jul 12 13:56:22 2024 +0800 [cherry-pick](branch-3.0) add inverted index storage format V2 (#37419) ## Proposed changes pick from master #37336 --- be/src/cloud/pb_convert.cpp | 4 + .../segment_v2/inverted_index_file_writer.cpp | 5 + .../org/apache/doris/alter/CloudRollupJobV2.java | 2 +- .../apache/doris/alter/CloudSchemaChangeJobV2.java | 3 +- .../cloud/datasource/CloudInternalCatalog.java | 14 ++- .../storage_format/test_storage_format_v2.out | 7 ++ .../storage_format/test_storage_format_v2.groovy | 132 +++++++++++++++++++++ 7 files changed, 163 insertions(+), 4 deletions(-) diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp index 24f72fd26d9..24bdadead33 100644 --- a/be/src/cloud/pb_convert.cpp +++ b/be/src/cloud/pb_convert.cpp @@ -276,6 +276,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, const TabletSchemaPB out->mutable_cluster_key_idxes()->CopyFrom(in.cluster_key_idxes()); out->set_is_dynamic_schema(in.is_dynamic_schema()); out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids()); + out->set_inverted_index_storage_format(in.inverted_index_storage_format()); } void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in) { @@ -301,6 +302,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in) out->mutable_cluster_key_idxes()->Swap(in.mutable_cluster_key_idxes()); out->set_is_dynamic_schema(in.is_dynamic_schema()); out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids()); + out->set_inverted_index_storage_format(in.inverted_index_storage_format()); } TabletSchemaPB cloud_tablet_schema_to_doris(const TabletSchemaCloudPB& in) { @@ -339,6 +341,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, const TabletSchemaCloudPB out->mutable_cluster_key_idxes()->CopyFrom(in.cluster_key_idxes()); out->set_is_dynamic_schema(in.is_dynamic_schema()); out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids()); + out->set_inverted_index_storage_format(in.inverted_index_storage_format()); } void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in) { @@ -365,6 +368,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in) out->mutable_cluster_key_idxes()->Swap(in.mutable_cluster_key_idxes()); out->set_is_dynamic_schema(in.is_dynamic_schema()); out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids()); + out->set_inverted_index_storage_format(in.inverted_index_storage_format()); } TabletMetaCloudPB doris_tablet_meta_to_cloud(const TabletMetaPB& in) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index cdd26fecf87..170a21872df 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -113,6 +113,11 @@ Status InvertedIndexFileWriter::close() { if (_indices_dirs.empty()) { return Status::OK(); } + DBUG_EXECUTE_IF("inverted_index_storage_format_must_be_v2", { + if (_storage_format != InvertedIndexStorageFormatPB::V2) { + _CLTHROWA(CL_ERR_IO, "inverted index storage format must be v2"); + } + }) if (_storage_format == InvertedIndexStorageFormatPB::V1) { try { _file_size = write_v1(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java index 688c2cd17cd..5764a8fbc3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java @@ -209,7 +209,7 @@ public class CloudRollupJobV2 extends RollupJobV2 { tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.getTimeSeriesCompactionLevelThreshold(), tbl.disableAutoCompaction(), - tbl.getRowStoreColumnsUniqueIds(rowStoreColumns)); + tbl.getRowStoreColumnsUniqueIds(rowStoreColumns), null); requestBuilder.addTabletMetas(builder); } // end for rollupTablets ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java index 3968f2d274f..2c7c4c27bff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java @@ -227,7 +227,8 @@ public class CloudSchemaChangeJobV2 extends SchemaChangeJobV2 { tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.getTimeSeriesCompactionLevelThreshold(), tbl.disableAutoCompaction(), - tbl.getRowStoreColumnsUniqueIds(rowStoreColumns)); + tbl.getRowStoreColumnsUniqueIds(rowStoreColumns), + tbl.getInvertedIndexFileStorageFormat()); requestBuilder.addTabletMetas(builder); } // end for rollupTablets ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 3ebc9d13808..541b884da14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -63,6 +63,7 @@ import org.apache.doris.proto.Types; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TCompressionType; +import org.apache.doris.thrift.TInvertedIndexFileStorageFormat; import org.apache.doris.thrift.TSortType; import org.apache.doris.thrift.TTabletType; @@ -169,7 +170,8 @@ public class CloudInternalCatalog extends InternalCatalog { tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.getTimeSeriesCompactionLevelThreshold(), tbl.disableAutoCompaction(), - tbl.getRowStoreColumnsUniqueIds(rowStoreColumns)); + tbl.getRowStoreColumnsUniqueIds(rowStoreColumns), + tbl.getInvertedIndexFileStorageFormat()); requestBuilder.addTabletMetas(builder); } if (!storageVaultIdSet && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) { @@ -216,7 +218,8 @@ public class CloudInternalCatalog extends InternalCatalog { Long timeSeriesCompactionGoalSizeMbytes, Long timeSeriesCompactionFileCountThreshold, Long timeSeriesCompactionTimeThresholdSeconds, Long timeSeriesCompactionEmptyRowsetsThreshold, Long timeSeriesCompactionLevelThreshold, boolean disableAutoCompaction, - List<Integer> rowStoreColumnUniqueIds) throws DdlException { + List<Integer> rowStoreColumnUniqueIds, + TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat) throws DdlException { OlapFile.TabletMetaCloudPB.Builder builder = OlapFile.TabletMetaCloudPB.newBuilder(); builder.setTableId(tableId); builder.setIndexId(indexId); @@ -334,6 +337,13 @@ public class CloudInternalCatalog extends InternalCatalog { } schemaBuilder.setDisableAutoCompaction(disableAutoCompaction); + if (invertedIndexFileStorageFormat != null) { + if (invertedIndexFileStorageFormat == TInvertedIndexFileStorageFormat.V1) { + schemaBuilder.setInvertedIndexStorageFormat(OlapFile.InvertedIndexStorageFormatPB.V1); + } else { + schemaBuilder.setInvertedIndexStorageFormat(OlapFile.InvertedIndexStorageFormatPB.V2); + } + } OlapFile.TabletSchemaCloudPB schema = schemaBuilder.build(); builder.setSchema(schema); // rowset diff --git a/regression-test/data/inverted_index_p0/storage_format/test_storage_format_v2.out b/regression-test/data/inverted_index_p0/storage_format/test_storage_format_v2.out new file mode 100644 index 00000000000..d9eb14edf42 --- /dev/null +++ b/regression-test/data/inverted_index_p0/storage_format/test_storage_format_v2.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +4315 + +-- !sql -- +4315 + diff --git a/regression-test/suites/inverted_index_p0/storage_format/test_storage_format_v2.groovy b/regression-test/suites/inverted_index_p0/storage_format/test_storage_format_v2.groovy new file mode 100644 index 00000000000..1b4d28e1033 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/storage_format/test_storage_format_v2.groovy @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_storage_format_v2", "p0, nonConcurrent") { + // define a sql table + def testTable = "httplogs_dup_v1" + + def create_httplogs_dup_table = {test_table -> + // multi-line sql + def result = sql """ + CREATE TABLE IF NOT EXISTS ${test_table} ( + `@timestamp` int(11) NULL, + `clientip` varchar(20) NULL, + `request` string NULL, + `status` int(11) NULL, + `size` int(11) NULL, + INDEX size_idx (`size`) USING INVERTED COMMENT '', + INDEX status_idx (`status`) USING INVERTED COMMENT '', + INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser"="english") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "inverted_index_storage_format" = "V2", + "disable_auto_compaction" = "true" + ); + """ + } + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1 -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + if (expected_succ_rows >= 0) { + assertEquals(json.NumberLoadedRows, expected_succ_rows) + } else { + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + } + + try { + sql "DROP TABLE IF EXISTS ${testTable}" + create_httplogs_dup_table.call(testTable) + + GetDebugPoint().enableDebugPointForAllBEs("inverted_index_storage_format_must_be_v2") + GetDebugPoint().enableDebugPointForAllBEs("match.invert_index_not_support_execute_match") + load_httplogs_data.call(testTable, 'label1', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(testTable, 'label2', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(testTable, 'label3', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(testTable, 'label4', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(testTable, 'label5', 'true', 'json', 'documents-1000.json') + sql "sync" + + qt_sql(" select COUNT(*) from ${testTable} where request match 'images' ") + + def getJobState = { indexName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def wait_for_schema_change = { -> + int max_try_time = 3000 + while (max_try_time--){ + String result = getJobState(testTable) + if (result == "FINISHED") { + sleep(3000) + break + } else { + if (result == "RUNNING") { + sleep(3000) + } + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } + + sql """ ALTER TABLE ${testTable} modify COLUMN status text""" + wait_for_schema_change.call() + + qt_sql(" select COUNT(*) from ${testTable} where request match 'images' ") + + } finally { + sql("DROP TABLE IF EXISTS ${testTable}") + GetDebugPoint().disableDebugPointForAllBEs("inverted_index_storage_format_must_be_v2") + GetDebugPoint().disableDebugPointForAllBEs("match.invert_index_not_support_execute_match") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org