This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a55e47acd7 [Enhancement](Load) stream tvf support two phase commit 
(#23800)
5a55e47acd7 is described below

commit 5a55e47acd7ebcf02945e006ffb395fadef08cf5
Author: zzzzzzzs <1443539...@qq.com>
AuthorDate: Mon Oct 9 14:15:56 2023 +0800

    [Enhancement](Load) stream tvf support two phase commit (#23800)
---
 be/src/http/action/http_stream.cpp                 | 15 +++-
 .../ExternalFileTableValuedFunction.java           |  2 +-
 .../load_p0/http_stream/test_http_stream_2pc.out   | 14 ++++
 .../http_stream/test_http_stream_2pc.groovy        | 80 ++++++++++++++++++++++
 4 files changed, 107 insertions(+), 4 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index e215543e1ed..2a2ddd8ad44 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -145,9 +145,16 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, 
std::shared_ptr<StreamLo
         return Status::OK();
     }
 
-    int64_t commit_and_publish_start_time = MonotonicNanos();
-    RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
-    ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+    } else {
+        // If put file success we need commit this load
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+    }
     return Status::OK();
 }
 
@@ -162,6 +169,8 @@ int HttpStreamAction::on_header(HttpRequest* req) {
 
     ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true");
 
+    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? 
true : false;
+
     LOG(INFO) << "new income streaming load request." << ctx->brief()
               << " sql : " << req->header(HTTP_SQL);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 6f2ae5d61d8..648011ff6dc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -179,7 +179,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         Map<String, String> copiedProps = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
         copiedProps.putAll(properties);
 
-        String formatString = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_FORMAT, "");
+        String formatString = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_FORMAT, "").toLowerCase();
         String defaultColumnSeparator = 
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
         switch (formatString) {
             case "csv":
diff --git a/regression-test/data/load_p0/http_stream/test_http_stream_2pc.out 
b/regression-test/data/load_p0/http_stream/test_http_stream_2pc.out
new file mode 100644
index 00000000000..83d23a5c0c2
--- /dev/null
+++ b/regression-test/data/load_p0/http_stream/test_http_stream_2pc.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+10000  aa
+10001  bb
+10002  cc
+10003  dd
+10004  ee
+10005  ff
+10006  gg
+10007  hh
+10008  ii
+10009  jj
+10010  kk
+
diff --git 
a/regression-test/suites/load_p0/http_stream/test_http_stream_2pc.groovy 
b/regression-test/suites/load_p0/http_stream/test_http_stream_2pc.groovy
new file mode 100644
index 00000000000..86466ba2726
--- /dev/null
+++ b/regression-test/suites/load_p0/http_stream/test_http_stream_2pc.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_http_stream_2pc", "p0") {
+
+   
+    // 1. test two phase commit
+    def tableName1 = "test_http_stream_2pc"
+    def db = "regression_test_load_p0_http_stream"
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName1} (
+            id int,
+            name CHAR(10),
+            dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+            dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+            dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+            dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        def json 
+        streamLoad {
+            set 'version', '1'
+            set 'two_phase_commit', 'true'
+            set 'sql', """
+                    insert into ${db}.${tableName1} (id, name) select c1, c2 
from http_stream("format"="csv")
+                    """
+            time 10000
+            file 'test_http_stream.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("http_stream result: ${result}".toString())
+                json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals("true", json.TwoPhaseCommit.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        def command = "curl -X PUT --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
txn_id:${json.TxnId} -H txn_operation:commit 
http://${context.config.feHttpAddress}/api/${db}/${tableName1}/_stream_load_2pc";
+        log.info("http_stream execute 2pc: ${command}")
+
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        json2pc = parseJson(out)
+        log.info("http_stream 2pc result: ${out}".toString())
+        assertEquals(code, 0)
+        assertEquals("success", json2pc.status.toLowerCase())
+
+        qt_sql "select id, name from ${tableName1} order by id"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName1}"
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to