This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0dccc4e6e48 [cherry-pick](branch-2.1)fix http error when downloading varaint inverted index file #35668 (#36061) 0dccc4e6e48 is described below commit 0dccc4e6e483e559e90b1bce46bdc52c25825aba Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Tue Jun 11 14:09:05 2024 +0800 [cherry-pick](branch-2.1)fix http error when downloading varaint inverted index file #35668 (#36061) pick from master[#35668](https://github.com/apache/doris/pull/35668) --- be/src/olap/single_replica_compaction.cpp | 22 +- be/src/olap/task/engine_clone_task.cpp | 21 +- ...ngle_compaction_with_variant_inverted_index.out | 4 + ...e_compaction_with_variant_inverted_index.groovy | 254 +++++++++++++++++++++ 4 files changed, 299 insertions(+), 2 deletions(-) diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index f9bd6549d65..c520b23f947 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -17,6 +17,8 @@ #include "olap/single_replica_compaction.h" +#include <curl/curl.h> + #include "common/logging.h" #include "gen_cpp/Types_constants.h" #include "gen_cpp/internal_service.pb.h" @@ -396,8 +398,26 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir, uint64_t total_file_size = 0; MonotonicStopWatch watch; watch.start(); + auto curl = std::unique_ptr<CURL, decltype(&curl_easy_cleanup)>(curl_easy_init(), + &curl_easy_cleanup); + if (!curl) { + return Status::InternalError("single compaction init curl failed"); + } for (auto& file_name : file_name_list) { - auto remote_file_url = remote_url_prefix + file_name; + // The file name of the variant column with the inverted index contains % + // such as: 020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx + // {rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx + // We need to handle %, otherwise it will cause an HTTP 404 error. + // Because the percent ("%") character serves as the indicator for percent-encoded octets, + // it must be percent-encoded as "%25" for that octet to be used as data within a URI. + // https://datatracker.ietf.org/doc/html/rfc3986 + auto output = std::unique_ptr<char, decltype(&curl_free)>( + curl_easy_escape(curl.get(), file_name.c_str(), file_name.length()), &curl_free); + if (!output) { + return Status::InternalError("escape file name failed, file name={}", file_name); + } + std::string encoded_filename(output.get()); + auto remote_file_url = remote_url_prefix + encoded_filename; // get file length uint64_t file_size = 0; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 70ec93727e9..81c9973fcf2 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -17,6 +17,7 @@ #include "olap/task/engine_clone_task.h" +#include <curl/curl.h> #include <fcntl.h> #include <fmt/format.h> #include <gen_cpp/AgentService_types.h> @@ -512,8 +513,26 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re uint64_t total_file_size = 0; MonotonicStopWatch watch; watch.start(); + auto curl = std::unique_ptr<CURL, decltype(&curl_easy_cleanup)>(curl_easy_init(), + &curl_easy_cleanup); + if (!curl) { + return Status::InternalError("engine clone task init curl failed"); + } for (auto& file_name : file_name_list) { - auto remote_file_url = remote_url_prefix + file_name; + // The file name of the variant column with the inverted index contains % + // such as: 020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx + // {rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx + // We need to handle %, otherwise it will cause an HTTP 404 error. + // Because the percent ("%") character serves as the indicator for percent-encoded octets, + // it must be percent-encoded as "%25" for that octet to be used as data within a URI. + // https://datatracker.ietf.org/doc/html/rfc3986 + auto output = std::unique_ptr<char, decltype(&curl_free)>( + curl_easy_escape(curl.get(), file_name.c_str(), file_name.length()), &curl_free); + if (!output) { + return Status::InternalError("escape file name failed, file name={}", file_name); + } + std::string encoded_filename(output.get()); + auto remote_file_url = remote_url_prefix + encoded_filename; // get file length uint64_t file_size = 0; diff --git a/regression-test/data/compaction/test_single_compaction_with_variant_inverted_index.out b/regression-test/data/compaction/test_single_compaction_with_variant_inverted_index.out new file mode 100644 index 00000000000..045b7675a91 --- /dev/null +++ b/regression-test/data/compaction/test_single_compaction_with_variant_inverted_index.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +6 + diff --git a/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy new file mode 100644 index 00000000000..c31d7ab7c28 --- /dev/null +++ b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_single_compaction_with_variant_inverted", "p2") { + def tableName = "test_single_compaction_with_variant_inverted" + + def set_be_config = { key, value -> + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def calc_file_crc_on_tablet = { ip, port, tablet -> + return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet)) + } + + boolean disableAutoCompaction = true + boolean has_update_be_config = false + try { + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + set_be_config.call("disable_auto_compaction", "true") + has_update_be_config = true + + def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id -> + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run?tablet_id=") + sb.append(tablet_id) + sb.append("&compact_type=${compact_type}") + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + out = process.getText() + logger.info("Run compaction: code=" + code + ", out=" + out + ", disableAutoCompaction " + disableAutoCompaction + ", err=" + err) + if (!disableAutoCompaction) { + return "Success, " + out + } + assertEquals(code, 0) + return out + } + + def triggerSingleCompaction = { be_host, be_http_port, tablet_id -> + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run?tablet_id=") + sb.append(tablet_id) + sb.append("&compact_type=cumulative&remote=true") + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + out = process.getText() + logger.info("Run compaction: code=" + code + ", out=" + out + ", disableAutoCompaction " + disableAutoCompaction + ", err=" + err) + if (!disableAutoCompaction) { + return "Success, " + out + } + assertEquals(code, 0) + return out + } + def waitForCompaction = { be_host, be_http_port, tablet_id -> + boolean running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getTabletStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get tablet status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `score` int(11) NULL, + `properties` variant, + INDEX idx_props (`properties`) USING INVERTED PROPERTIES("parser" = "none") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( "replication_num" = "2", "enable_single_replica_compaction" = "true", "inverted_index_storage_format" = "V1"); + """ + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + // wait for update replica infos + // be.conf: update_replica_infos_interval_seconds + 2s + Thread.sleep(62000) + + // find the master be for single replica compaction + Boolean found = false + String master_backend_id; + List<String> follower_backend_id = new ArrayList<>() + // The test table only has one bucket with 2 replicas, + // and `show tablets` will return 2 different replicas with the same tablet. + // So we can use the same tablet_id to get tablet/trigger compaction with different backends. + String tablet_id = tablets[0].TabletId + def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ + logger.info("tablet: " + tablet_info) + for (def tablet in tablets) { + String trigger_backend_id = tablet.BackendId + def tablet_status = getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); + if (!tablet_status.containsKey("single replica compaction status")) { + if (found) { + logger.warn("multipe master"); + assertTrue(false) + } + found = true + master_backend_id = trigger_backend_id + } else { + follower_backend_id.add(trigger_backend_id) + } + } + + def checkCompactionResult = { + def master_tablet_status = getTabletStatus(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id); + def master_rowsets = master_tablet_status."rowsets" + assert master_rowsets instanceof List + logger.info("rowset size: " + master_rowsets.size()) + + for (String backend: follower_backend_id) { + def tablet_status = getTabletStatus(backendId_to_backendIP[backend], backendId_to_backendHttpPort[backend], tablet_id); + def rowsets = tablet_status."rowsets" + assert rowsets instanceof List + assertEquals(master_rowsets.size(), rowsets.size()) + } + } + + def checkTabletFileCrc = { + def (master_code, master_out, master_err) = calc_file_crc_on_tablet(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id) + logger.info("Run calc_file_crc_on_tablet: ip=" + backendId_to_backendIP[master_backend_id] + " code=" + master_code + ", out=" + master_out + ", err=" + master_err) + + for (String backend: follower_backend_id) { + def (follower_code, follower_out, follower_err) = calc_file_crc_on_tablet(backendId_to_backendIP[backend], backendId_to_backendHttpPort[backend], tablet_id) + logger.info("Run calc_file_crc_on_tablet: ip=" + backendId_to_backendIP[backend] + " code=" + follower_code + ", out=" + follower_out + ", err=" + follower_err) + assertTrue(parseJson(follower_out.trim()).crc_value == parseJson(master_out.trim()).crc_value) + assertTrue(parseJson(follower_out.trim()).start_version == parseJson(master_out.trim()).start_version) + assertTrue(parseJson(follower_out.trim()).end_version == parseJson(master_out.trim()).end_version) + assertTrue(parseJson(follower_out.trim()).file_count == parseJson(master_out.trim()).file_count) + assertTrue(parseJson(follower_out.trim()).rowset_count == parseJson(master_out.trim()).rowset_count) + } + } + + sql """ INSERT INTO ${tableName} VALUES (1, "a", 100, '{"a" : 1234, "point" : 1, "xxxx" : "ddddd"}'); """ + sql """ INSERT INTO ${tableName} VALUES (1, "b", 100, '{"%a" : 1234, "@point" : 1, "[xxxx" : "ddddd"}'); """ + sql """ INSERT INTO ${tableName} VALUES (2, "a", 100, '{"@a" : 1234, "%point" : 1, "]xxxx" : "ddddd"}'); """ + sql """ INSERT INTO ${tableName} VALUES (2, "b", 100, '{"%a" : 1234, "%point" : 1, "{xxxx" : "ddddd"}'); """ + sql """ INSERT INTO ${tableName} VALUES (3, "a", 100, '{"@a" : 1234, "@point" : 1, "}xxxx" : "ddddd"}'); """ + sql """ INSERT INTO ${tableName} VALUES (3, "b", 100, '{"a" : 1234, "point" : 1, "|xxxx" : "ddddd"}'); """ + + // trigger master be to do full compaction + assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], + "full", tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id) + + // trigger follower be to fetch compaction result + for (String id in follower_backend_id) { + assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success")); + waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id) + } + + // check rowsets + checkCompactionResult.call() + checkTabletFileCrc.call() + + qt_sql """ + select count() from ${tableName} where properties MATCH_ANY 'point xxxx'; + """ + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + + } finally { + if (has_update_be_config) { + set_be_config.call("disable_auto_compaction", disableAutoCompaction.toString()) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org