This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0dccc4e6e48 [cherry-pick](branch-2.1)fix http error when downloading 
varaint inverted index file #35668 (#36061)
0dccc4e6e48 is described below

commit 0dccc4e6e483e559e90b1bce46bdc52c25825aba
Author: Sun Chenyang <csun5...@gmail.com>
AuthorDate: Tue Jun 11 14:09:05 2024 +0800

    [cherry-pick](branch-2.1)fix http error when downloading varaint inverted 
index file #35668 (#36061)
    
    pick from master[#35668](https://github.com/apache/doris/pull/35668)
---
 be/src/olap/single_replica_compaction.cpp          |  22 +-
 be/src/olap/task/engine_clone_task.cpp             |  21 +-
 ...ngle_compaction_with_variant_inverted_index.out |   4 +
 ...e_compaction_with_variant_inverted_index.groovy | 254 +++++++++++++++++++++
 4 files changed, 299 insertions(+), 2 deletions(-)

diff --git a/be/src/olap/single_replica_compaction.cpp 
b/be/src/olap/single_replica_compaction.cpp
index f9bd6549d65..c520b23f947 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -17,6 +17,8 @@
 
 #include "olap/single_replica_compaction.h"
 
+#include <curl/curl.h>
+
 #include "common/logging.h"
 #include "gen_cpp/Types_constants.h"
 #include "gen_cpp/internal_service.pb.h"
@@ -396,8 +398,26 @@ Status SingleReplicaCompaction::_download_files(DataDir* 
data_dir,
     uint64_t total_file_size = 0;
     MonotonicStopWatch watch;
     watch.start();
+    auto curl = std::unique_ptr<CURL, 
decltype(&curl_easy_cleanup)>(curl_easy_init(),
+                                                                    
&curl_easy_cleanup);
+    if (!curl) {
+        return Status::InternalError("single compaction init curl failed");
+    }
     for (auto& file_name : file_name_list) {
-        auto remote_file_url = remote_url_prefix + file_name;
+        // The file name of the variant column with the inverted index 
contains %
+        // such as: 
020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx
+        //  
{rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx
+        // We need to handle %, otherwise it will cause an HTTP 404 error.
+        // Because the percent ("%") character serves as the indicator for 
percent-encoded octets,
+        // it must be percent-encoded as "%25" for that octet to be used as 
data within a URI.
+        // https://datatracker.ietf.org/doc/html/rfc3986
+        auto output = std::unique_ptr<char, decltype(&curl_free)>(
+                curl_easy_escape(curl.get(), file_name.c_str(), 
file_name.length()), &curl_free);
+        if (!output) {
+            return Status::InternalError("escape file name failed, file 
name={}", file_name);
+        }
+        std::string encoded_filename(output.get());
+        auto remote_file_url = remote_url_prefix + encoded_filename;
 
         // get file length
         uint64_t file_size = 0;
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 70ec93727e9..81c9973fcf2 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -17,6 +17,7 @@
 
 #include "olap/task/engine_clone_task.h"
 
+#include <curl/curl.h>
 #include <fcntl.h>
 #include <fmt/format.h>
 #include <gen_cpp/AgentService_types.h>
@@ -512,8 +513,26 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, 
const std::string& re
     uint64_t total_file_size = 0;
     MonotonicStopWatch watch;
     watch.start();
+    auto curl = std::unique_ptr<CURL, 
decltype(&curl_easy_cleanup)>(curl_easy_init(),
+                                                                    
&curl_easy_cleanup);
+    if (!curl) {
+        return Status::InternalError("engine clone task init curl failed");
+    }
     for (auto& file_name : file_name_list) {
-        auto remote_file_url = remote_url_prefix + file_name;
+        // The file name of the variant column with the inverted index 
contains %
+        // such as: 
020000000000003f624c4c322c568271060f9b5b274a4a95_0_10133@properties%2Emessage.idx
+        //  
{rowset_id}_{seg_num}_{index_id}_{variant_column_name}{%2E}{extracted_column_name}.idx
+        // We need to handle %, otherwise it will cause an HTTP 404 error.
+        // Because the percent ("%") character serves as the indicator for 
percent-encoded octets,
+        // it must be percent-encoded as "%25" for that octet to be used as 
data within a URI.
+        // https://datatracker.ietf.org/doc/html/rfc3986
+        auto output = std::unique_ptr<char, decltype(&curl_free)>(
+                curl_easy_escape(curl.get(), file_name.c_str(), 
file_name.length()), &curl_free);
+        if (!output) {
+            return Status::InternalError("escape file name failed, file 
name={}", file_name);
+        }
+        std::string encoded_filename(output.get());
+        auto remote_file_url = remote_url_prefix + encoded_filename;
 
         // get file length
         uint64_t file_size = 0;
diff --git 
a/regression-test/data/compaction/test_single_compaction_with_variant_inverted_index.out
 
b/regression-test/data/compaction/test_single_compaction_with_variant_inverted_index.out
new file mode 100644
index 00000000000..045b7675a91
--- /dev/null
+++ 
b/regression-test/data/compaction/test_single_compaction_with_variant_inverted_index.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+6
+
diff --git 
a/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
 
b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
new file mode 100644
index 00000000000..c31d7ab7c28
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_single_compaction_with_variant_inverted_index.groovy
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_single_compaction_with_variant_inverted", "p2") {
+    def tableName = "test_single_compaction_with_variant_inverted"
+  
+    def set_be_config = { key, value ->
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    def calc_file_crc_on_tablet = { ip, port, tablet ->
+        return curl("GET", 
String.format("http://%s:%s/api/calc_crc?tablet_id=%s";, ip, port, tablet))
+    }
+
+    boolean disableAutoCompaction = true
+    boolean has_update_be_config = false
+    try {
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+        
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
+            }
+        }
+        set_be_config.call("disable_auto_compaction", "true")
+        has_update_be_config = true
+
+        def triggerCompaction = { be_host, be_http_port, compact_type, 
tablet_id ->
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X POST http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/run?tablet_id=")
+            sb.append(tablet_id)
+            sb.append("&compact_type=${compact_type}")
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+            out = process.getText()
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
+            if (!disableAutoCompaction) {
+                return "Success, " + out
+            }
+            assertEquals(code, 0)
+            return out
+        } 
+
+        def triggerSingleCompaction = { be_host, be_http_port, tablet_id ->
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X POST http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/run?tablet_id=")
+            sb.append(tablet_id)
+            sb.append("&compact_type=cumulative&remote=true")
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+            out = process.getText()
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
+            if (!disableAutoCompaction) {
+                return "Success, " + out
+            }
+            assertEquals(code, 0)
+            return out
+        }
+        def waitForCompaction = { be_host, be_http_port, tablet_id ->
+            boolean running = true
+            do {
+                Thread.sleep(1000)
+                StringBuilder sb = new StringBuilder();
+                sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+                sb.append("/api/compaction/run_status?tablet_id=")
+                sb.append(tablet_id)
+
+                String command = sb.toString()
+                logger.info(command)
+                process = command.execute()
+                code = process.waitFor()
+                out = process.getText()
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+        }
+
+        def getTabletStatus = { be_host, be_http_port, tablet_id ->
+            boolean running = true
+            Thread.sleep(1000)
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/show?tablet_id=")
+            sb.append(tablet_id)
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            out = process.getText()
+            logger.info("Get tablet status: code=" + code + ", out=" + out)
+            assertEquals(code, 0)
+            def tabletStatus = parseJson(out.trim())
+            return tabletStatus
+        }
+
+
+        sql """ DROP TABLE IF EXISTS ${tableName}; """
+        sql """
+            CREATE TABLE ${tableName} (
+                `id` int(11) NULL,
+                `name` varchar(255) NULL,
+                `score` int(11) NULL,
+                `properties` variant,
+                INDEX idx_props (`properties`) USING INVERTED 
PROPERTIES("parser" = "none") COMMENT ''
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`id`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 1
+            PROPERTIES ( "replication_num" = "2", 
"enable_single_replica_compaction" = "true", "inverted_index_storage_format" = 
"V1");
+        """
+
+        def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
+
+        // wait for update replica infos
+        // be.conf: update_replica_infos_interval_seconds + 2s
+        Thread.sleep(62000)
+        
+        // find the master be for single replica compaction
+        Boolean found = false
+        String master_backend_id;
+        List<String> follower_backend_id = new ArrayList<>()
+        // The test table only has one bucket with 2 replicas,
+        // and `show tablets` will return 2 different replicas with the same 
tablet.
+        // So we can use the same tablet_id to get tablet/trigger compaction 
with different backends.
+        String tablet_id = tablets[0].TabletId
+        def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """
+        logger.info("tablet: " + tablet_info)
+        for (def tablet in tablets) {
+            String trigger_backend_id = tablet.BackendId
+            def tablet_status = 
getTabletStatus(backendId_to_backendIP[trigger_backend_id], 
backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
+            if (!tablet_status.containsKey("single replica compaction 
status")) {
+                if (found) {
+                    logger.warn("multipe master");
+                    assertTrue(false)
+                }
+                found = true
+                master_backend_id = trigger_backend_id
+            } else {
+                follower_backend_id.add(trigger_backend_id)
+            }
+        }
+
+        def checkCompactionResult = {
+            def master_tablet_status = 
getTabletStatus(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id], tablet_id);
+            def master_rowsets = master_tablet_status."rowsets"
+            assert master_rowsets instanceof List
+            logger.info("rowset size: " + master_rowsets.size())
+
+            for (String backend: follower_backend_id) {
+                def tablet_status = 
getTabletStatus(backendId_to_backendIP[backend], 
backendId_to_backendHttpPort[backend], tablet_id);
+                def rowsets = tablet_status."rowsets"
+                assert rowsets instanceof List
+                assertEquals(master_rowsets.size(), rowsets.size())
+            }
+        }
+
+        def checkTabletFileCrc = {
+            def (master_code, master_out, master_err) = 
calc_file_crc_on_tablet(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+            logger.info("Run calc_file_crc_on_tablet: ip=" + 
backendId_to_backendIP[master_backend_id] + " code=" + master_code + ", out=" + 
master_out + ", err=" + master_err)
+
+            for (String backend: follower_backend_id) {
+                def (follower_code, follower_out, follower_err) = 
calc_file_crc_on_tablet(backendId_to_backendIP[backend], 
backendId_to_backendHttpPort[backend], tablet_id)
+                logger.info("Run calc_file_crc_on_tablet: ip=" + 
backendId_to_backendIP[backend] + " code=" + follower_code + ", out=" + 
follower_out + ", err=" + follower_err)
+                assertTrue(parseJson(follower_out.trim()).crc_value == 
parseJson(master_out.trim()).crc_value)
+                assertTrue(parseJson(follower_out.trim()).start_version == 
parseJson(master_out.trim()).start_version)
+                assertTrue(parseJson(follower_out.trim()).end_version == 
parseJson(master_out.trim()).end_version)
+                assertTrue(parseJson(follower_out.trim()).file_count == 
parseJson(master_out.trim()).file_count)
+                assertTrue(parseJson(follower_out.trim()).rowset_count == 
parseJson(master_out.trim()).rowset_count)
+            }
+        }
+
+        sql """ INSERT INTO ${tableName} VALUES (1, "a", 100, '{"a" : 1234, 
"point" : 1, "xxxx" : "ddddd"}'); """
+        sql """ INSERT INTO ${tableName} VALUES (1, "b", 100, '{"%a" : 1234, 
"@point" : 1, "[xxxx" : "ddddd"}'); """
+        sql """ INSERT INTO ${tableName} VALUES (2, "a", 100, '{"@a" : 1234, 
"%point" : 1, "]xxxx" : "ddddd"}'); """
+        sql """ INSERT INTO ${tableName} VALUES (2, "b", 100, '{"%a" : 1234, 
"%point" : 1, "{xxxx" : "ddddd"}'); """
+        sql """ INSERT INTO ${tableName} VALUES (3, "a", 100, '{"@a" : 1234, 
"@point" : 1, "}xxxx" : "ddddd"}'); """
+        sql """ INSERT INTO ${tableName} VALUES (3, "b", 100, '{"a" : 1234, 
"point" : 1, "|xxxx" : "ddddd"}'); """
+
+        // trigger master be to do full compaction
+        
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id],
+                    "full", tablet_id).contains("Success")); 
+        waitForCompaction(backendId_to_backendIP[master_backend_id], 
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+
+        // trigger follower be to fetch compaction result
+        for (String id in follower_backend_id) {
+            assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], 
backendId_to_backendHttpPort[id], tablet_id).contains("Success")); 
+            waitForCompaction(backendId_to_backendIP[id], 
backendId_to_backendHttpPort[id], tablet_id)
+        }
+
+        // check rowsets
+        checkCompactionResult.call()
+        checkTabletFileCrc.call()
+
+        qt_sql """
+        select count() from  ${tableName} where properties MATCH_ANY 'point 
xxxx';
+        """
+
+        sql """ DROP TABLE IF EXISTS ${tableName}; """
+  
+    } finally {
+        if (has_update_be_config) {
+            set_be_config.call("disable_auto_compaction", 
disableAutoCompaction.toString())
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to