This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5a55e47acd7 [Enhancement](Load) stream tvf support two phase commit (#23800) 5a55e47acd7 is described below commit 5a55e47acd7ebcf02945e006ffb395fadef08cf5 Author: zzzzzzzs <1443539...@qq.com> AuthorDate: Mon Oct 9 14:15:56 2023 +0800 [Enhancement](Load) stream tvf support two phase commit (#23800) --- be/src/http/action/http_stream.cpp | 15 +++- .../ExternalFileTableValuedFunction.java | 2 +- .../load_p0/http_stream/test_http_stream_2pc.out | 14 ++++ .../http_stream/test_http_stream_2pc.groovy | 80 ++++++++++++++++++++++ 4 files changed, 107 insertions(+), 4 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index e215543e1ed..2a2ddd8ad44 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -145,9 +145,16 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLo return Status::OK(); } - int64_t commit_and_publish_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); - ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); + ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + } return Status::OK(); } @@ -162,6 +169,8 @@ int HttpStreamAction::on_header(HttpRequest* req) { ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true"); + ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false; + LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql : " << req->header(HTTP_SQL); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 6f2ae5d61d8..648011ff6dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -179,7 +179,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio Map<String, String> copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); copiedProps.putAll(properties); - String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, ""); + String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, "").toLowerCase(); String defaultColumnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR; switch (formatString) { case "csv": diff --git a/regression-test/data/load_p0/http_stream/test_http_stream_2pc.out b/regression-test/data/load_p0/http_stream/test_http_stream_2pc.out new file mode 100644 index 00000000000..83d23a5c0c2 --- /dev/null +++ b/regression-test/data/load_p0/http_stream/test_http_stream_2pc.out @@ -0,0 +1,14 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +10000 aa +10001 bb +10002 cc +10003 dd +10004 ee +10005 ff +10006 gg +10007 hh +10008 ii +10009 jj +10010 kk + diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream_2pc.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream_2pc.groovy new file mode 100644 index 00000000000..86466ba2726 --- /dev/null +++ b/regression-test/suites/load_p0/http_stream/test_http_stream_2pc.groovy @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_http_stream_2pc", "p0") { + + + // 1. test two phase commit + def tableName1 = "test_http_stream_2pc" + def db = "regression_test_load_p0_http_stream" + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName1} ( + id int, + name CHAR(10), + dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP, + dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP, + dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP, + dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + def json + streamLoad { + set 'version', '1' + set 'two_phase_commit', 'true' + set 'sql', """ + insert into ${db}.${tableName1} (id, name) select c1, c2 from http_stream("format"="csv") + """ + time 10000 + file 'test_http_stream.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("http_stream result: ${result}".toString()) + json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals("true", json.TwoPhaseCommit.toLowerCase()) + assertEquals(11, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + + def command = "curl -X PUT --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H txn_id:${json.TxnId} -H txn_operation:commit http://${context.config.feHttpAddress}/api/${db}/${tableName1}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + json2pc = parseJson(out) + log.info("http_stream 2pc result: ${out}".toString()) + assertEquals(code, 0) + assertEquals("success", json2pc.status.toLowerCase()) + + qt_sql "select id, name from ${tableName1} order by id" + } finally { + try_sql "DROP TABLE IF EXISTS ${tableName1}" + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org