This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 145afabedc81f4029c7ae35d3b71016956cf87c6
Author: wudi <676366...@qq.com>
AuthorDate: Mon Jul 1 10:01:41 2024 +0800

    [regression-test](connector) Add a case for the response of streamload that 
the connector depends (#36864)
---
 .../data/flink_connector_p0/test_response.csv      |   2 +
 .../flink_connector_response.groovy                | 186 +++++++++++++++++++++
 2 files changed, 188 insertions(+)

diff --git a/regression-test/data/flink_connector_p0/test_response.csv 
b/regression-test/data/flink_connector_p0/test_response.csv
new file mode 100644
index 00000000000..b9fc6ccd99c
--- /dev/null
+++ b/regression-test/data/flink_connector_p0/test_response.csv
@@ -0,0 +1,2 @@
+1,zhangsan
+2,lisi
diff --git 
a/regression-test/suites/flink_connector_p0/flink_connector_response.groovy 
b/regression-test/suites/flink_connector_p0/flink_connector_response.groovy
new file mode 100644
index 00000000000..c5aa754a52a
--- /dev/null
+++ b/regression-test/suites/flink_connector_p0/flink_connector_response.groovy
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License./
+
+import java.util.regex.Pattern
+
+/**
+ * Flink connector depends on these responses
+ */
+suite("flink_connector_response") {
+
+    def LABEL_EXIST_PATTERN =
+            Pattern.compile("Label \\[(.*)\\] has already been used, relate to 
txn \\[(\\d+)\\]");
+    def COMMITTED_PATTERN =
+            Pattern.compile(
+                    "transaction \\[(\\d+)\\] is already 
\\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
+    def ABORTTED_PATTERN =
+            Pattern.compile(
+                    "transaction \\[(\\d+)\\] is already|transaction 
\\[(\\d+)\\] not found");
+
+    def thisDb = sql """select database()""";
+    thisDb = thisDb[0][0];
+    logger.info("current database is ${thisDb}");
+
+    def tableName = "test_response"
+    sql """DROP TABLE IF EXISTS ${tableName}"""
+
+    sql """
+        CREATE TABLE `${tableName}` (
+            `id` int,
+            `name` string
+        )
+DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+    """;
+
+    def execute_stream_load_cmd = {label ->
+        def filePath = 
"${context.config.dataPath}/flink_connector_p0/test_response.csv"
+        StringBuilder strBuilder = new StringBuilder()
+        strBuilder.append("""curl --location-trusted -u """ + 
context.config.feHttpUser + ":" + context.config.feHttpPassword)
+        strBuilder.append(""" -H two_phase_commit:true """)
+        strBuilder.append(""" -H column_separator:, """)
+        strBuilder.append(""" -H expect:100-continue """)
+        strBuilder.append(""" -H label:""" + label)
+        strBuilder.append(""" -T """ + filePath)
+        strBuilder.append(""" http://"""; + context.config.feHttpAddress + 
"""/api/${thisDb}/${tableName}/_stream_load""")
+
+        String command = strBuilder.toString()
+        logger.info("streamload command=" + command)
+        def process = command.toString().execute()
+        process.waitFor()
+        def out = process.getText()
+        println(out)
+        out
+    }
+
+    def execute_commit_cmd = {txnId ->
+        StringBuilder strBuilder = new StringBuilder()
+        strBuilder.append("""curl -X PUT --location-trusted -u """ + 
context.config.feHttpUser + ":" + context.config.feHttpPassword)
+        strBuilder.append(""" -H txn_id:""" + txnId)
+        strBuilder.append(""" -H txn_operation:commit""")
+        strBuilder.append(""" http://"""; + context.config.feHttpAddress + 
"""/api/${thisDb}/${tableName}/_stream_load_2pc""")
+
+        String command = strBuilder.toString()
+        logger.info("streamload command=" + command)
+        def processCommit = command.toString().execute()
+        processCommit.waitFor()
+        def outCommit = processCommit.getText()
+        println(outCommit)
+        outCommit
+    }
+
+    def execute_abort_cmd = {txnId ->
+        StringBuilder strBuilder = new StringBuilder()
+        strBuilder.append("""curl -X PUT --location-trusted -u """ + 
context.config.feHttpUser + ":" + context.config.feHttpPassword)
+        strBuilder.append(""" -H txn_id:""" + txnId)
+        strBuilder.append(""" -H txn_operation:abort""")
+        strBuilder.append(""" http://"""; + context.config.feHttpAddress + 
"""/api/${thisDb}/${tableName}/_stream_load_2pc""")
+
+        String command = strBuilder.toString()
+        logger.info("streamload command=" + command)
+        def processAbort = command.toString().execute()
+        processAbort.waitFor()
+        def outAbort = processAbort.getText()
+        println(outAbort)
+        outAbort
+    }
+
+    //normal stream load
+    def uuid = UUID.randomUUID().toString().replaceAll("-", "");
+    String out = execute_stream_load_cmd(uuid)
+    def jsonout = parseJson(out);
+    assertEquals(jsonout.Status, "Success")
+    assertEquals(jsonout.Label, uuid)
+    assertEquals(jsonout.NumberTotalRows, 2)
+    def txnid = jsonout.TxnId
+
+    //label exist load
+    println("start label exists load")
+    def outLabelExist = execute_stream_load_cmd(uuid)
+    def outLabelExistJson = parseJson(outLabelExist);
+    assertEquals(outLabelExistJson.Status, "Label Already Exists")
+    assertTrue(LABEL_EXIST_PATTERN.matcher(outLabelExistJson.Message).find());
+
+    def no_exist_txnid = Long.MAX_VALUE;
+    //abort txnid
+    println("start abort txnid")
+    def outAbort = execute_abort_cmd(txnid)
+    jsonout = parseJson(outAbort);
+    assertEquals(jsonout.status, "Success")
+
+    //abort not exist txnid
+    println("start abort not exist txnid")
+    outAbort = execute_abort_cmd(no_exist_txnid)
+    jsonout = parseJson(outAbort)
+    assertNotEquals(jsonout.status, "Success")
+    assertTrue(ABORTTED_PATTERN.matcher(jsonout.msg).find())
+
+    //abort already abort txnid
+    println("start abort already abort txnid")
+    outAbort = execute_abort_cmd(txnid);
+    jsonout = parseJson(outAbort)
+    assertNotEquals(jsonout.status, "Success")
+    assertTrue(ABORTTED_PATTERN.matcher(jsonout.msg).find())
+
+    //commit not exist txnid
+    println("start commit not exist txnid")
+    def outCommit = execute_commit_cmd(no_exist_txnid)
+    jsonout = parseJson(outAbort)
+    assertNotEquals(jsonout.status, "Success")
+    assertTrue(ABORTTED_PATTERN.matcher(jsonout.msg).find())
+
+    //commit already abort txnid
+    println("start commit already abort txnid")
+    outCommit = execute_commit_cmd(txnid)
+    jsonout = parseJson(outCommit)
+    assertNotEquals(jsonout.status, "Success")
+    assertTrue(ABORTTED_PATTERN.matcher(jsonout.msg).find())
+
+
+    //start new normal stream load
+    println("start new stream load")
+    uuid = UUID.randomUUID().toString().replaceAll("-", "");
+    out = execute_stream_load_cmd(uuid)
+    jsonout = parseJson(out);
+    assertEquals(jsonout.Status, "Success")
+    assertEquals(jsonout.Label, uuid)
+    assertEquals(jsonout.NumberTotalRows, 2)
+    //commit txnid
+    def commitTxnId = jsonout.TxnId
+    out = execute_commit_cmd(commitTxnId)
+    jsonout = parseJson(out);
+    assertEquals(jsonout.status, "Success")
+
+    //commit already commit txnid
+    println("start commit already commit txnid")
+    outCommit = execute_commit_cmd(commitTxnId)
+    jsonout = parseJson(outCommit)
+    assertNotEquals(jsonout.status, "Success")
+    assertTrue(COMMITTED_PATTERN.matcher(jsonout.msg).find())
+
+    //abort already commit txnid
+    println("start abort already commit txnid")
+    outAbort = execute_abort_cmd(commitTxnId)
+    jsonout = parseJson(outAbort)
+    assertNotEquals(jsonout.status, "Success")
+    assertTrue(ABORTTED_PATTERN.matcher(jsonout.msg).find())
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to