This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4dd53799517 [bugfix](hive)fix error for writing to hive for 2.1 
(#34518)
4dd53799517 is described below

commit 4dd537995172ebab812bf3c694e5b2d5a89c0fee
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Tue May 14 23:27:29 2024 +0800

    [bugfix](hive)fix error for writing to hive for 2.1 (#34518)
    
    mirror #34520
---
 be/src/io/file_factory.cpp                         |  13 +++
 be/src/vec/runtime/vorc_transformer.cpp            |  12 ++-
 .../hive/write/test_hive_write_different_path.out  |  21 +++++
 .../write/test_hive_write_different_path.groovy    | 102 +++++++++++++++++++++
 4 files changed, 144 insertions(+), 4 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index aa38e220199..777a3505701 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -94,6 +94,19 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
     case TFileType::FILE_HDFS: {
         THdfsParams hdfs_params = parse_properties(properties);
         std::shared_ptr<io::HdfsFileSystem> fs;
+        // If the destination path contains a schema, use the schema directly.
+        // If not, use defaultFS.
+        // Otherwise a write error will occur.
+        // example:
+        //    hdfs://host:port/path1/path2  --> hdfs://host:port
+        //    hdfs://nameservice/path1/path2 --> hdfs://nameservice
+        string::size_type idx = path.find("://");
+        if (idx != string::npos) {
+            idx = path.find("/", idx + 3);
+            if (idx != string::npos) {
+                hdfs_params.fs_name = path.substr(0, idx);
+            }
+        }
         RETURN_IF_ERROR(
                 io::HdfsFileSystem::create(hdfs_params, "", 
hdfs_params.fs_name, nullptr, &fs));
         RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
diff --git a/be/src/vec/runtime/vorc_transformer.cpp 
b/be/src/vec/runtime/vorc_transformer.cpp
index 7cd5b9ad10d..ab92a7be543 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -91,7 +91,10 @@ void VOrcOutputStream::write(const void* data, size_t 
length) {
         Status st = _file_writer->append({static_cast<const uint8_t*>(data), 
length});
         if (!st.ok()) {
             LOG(WARNING) << "Write to ORC file failed: " << st;
-            return;
+            // When a write error occurs,
+            // the error needs to be thrown to the upper layer.
+            // so that fe can get the exception.
+            throw std::runtime_error(st.to_string());
         }
         _cur_pos += length;
         _written_len += length;
@@ -148,9 +151,10 @@ Status VOrcTransformer::open() {
     }
 
     _output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
-    _writer = orc::createWriter(*_schema, _output_stream.get(), 
*_write_options);
-    if (_writer == nullptr) {
-        return Status::InternalError("Failed to create file writer");
+    try {
+        _writer = orc::createWriter(*_schema, _output_stream.get(), 
*_write_options);
+    } catch (const std::exception& e) {
+        return Status::InternalError("failed to create writer: {}", e.what());
     }
     return Status::OK();
 }
diff --git 
a/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out
 
b/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out
new file mode 100644
index 00000000000..626a46763ea
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q01 --
+1      a       a       1
+2      b       a       1
+3      c       a       1
+
+-- !q02 --
+4      d       a       1
+5      e       a       1
+6      f       a       1
+
+-- !q01 --
+1      a       a       1
+2      b       a       1
+3      c       a       1
+
+-- !q02 --
+4      d       a       1
+5      e       a       1
+6      f       a       1
+
diff --git 
a/regression-test/suites/external_table_p0/hive/write/test_hive_write_different_path.groovy
 
b/regression-test/suites/external_table_p0/hive/write/test_hive_write_different_path.groovy
new file mode 100644
index 00000000000..744e474539c
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/hive/write/test_hive_write_different_path.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_write_different_path", 
"p0,external,hive,external_docker,external_docker_hive") {
+
+    for (String hivePrefix : ["hive2", "hive3"]) {
+
+        String enabled = context.config.otherConfigs.get("enableHiveTest")
+        if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+            logger.info("diable Hive test.")
+            return;
+        }
+
+        setHivePrefix(hivePrefix)
+        try {
+            String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+            String hdfs_port = context.config.otherConfigs.get(hivePrefix + 
"HdfsPort")
+            String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+
+            String catalog1 = 
"test_${hivePrefix}_write_insert_without_defaultfs"
+            String catalog2 = 
"test_${hivePrefix}_write_insert_with_external_ip"
+            String catalog3 = "test_${hivePrefix}_write_insert_with_local_ip"
+            String localEnvIp = "127.0.0.1"
+
+            sql """drop catalog if exists ${catalog1}"""
+            sql """drop catalog if exists ${catalog2}"""
+            sql """drop catalog if exists ${catalog3}"""
+            sql """create catalog if not exists ${catalog1} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
+            );"""
+
+            sql """ use ${catalog1}.write_test """
+            sql """ drop table if exists tb_with_external_ip """
+            sql """ drop table if exists tb_with_local_ip """
+
+            sql """
+              CREATE TABLE `tb_with_external_ip`
+              (
+                `col_bigint_undef_signed` BIGINT NULL,   
+                `col_varchar_10__undef_signed` VARCHAR(10) NULL,   
+                `col_varchar_64__undef_signed` VARCHAR(64) NULL,   
+                `pk` INT NULL ) properties (
+                'location' = 
'hdfs://${externalEnvIp}:${hdfs_port}/user/hive/warehouse/write_test.db/tb_with_external_ip/'
+              ); 
+            """
+
+            sql """
+              CREATE TABLE `tb_with_local_ip`
+              (
+                `col_bigint_undef_signed` BIGINT NULL,   
+                `col_varchar_10__undef_signed` VARCHAR(10) NULL,   
+                `col_varchar_64__undef_signed` VARCHAR(64) NULL,   
+                `pk` INT NULL ) properties (
+                'location' = 
'hdfs://${localEnvIp}:${hdfs_port}/user/hive/warehouse/write_test.db/tb_with_local_ip/'
+              ); 
+            """
+            
+            sql """create catalog if not exists ${catalog2} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 
'thrift://${externalEnvIp}:${hms_port}',
+                'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}'
+            );"""
+            sql """create catalog if not exists ${catalog3} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 
'thrift://${externalEnvIp}:${hms_port}',
+                'fs.defaultFS' = 'hdfs://${localEnvIp}:${hdfs_port}'
+            );"""
+
+            sql """ insert into ${catalog1}.write_test.tb_with_external_ip 
values (1,'a','a',1) """
+            sql """ insert into ${catalog2}.write_test.tb_with_external_ip 
values (2,'b','a',1) """
+            sql """ insert into ${catalog3}.write_test.tb_with_external_ip 
values (3,'c','a',1) """
+            sql """ insert into ${catalog1}.write_test.tb_with_local_ip values 
(4,'d','a',1) """
+            sql """ insert into ${catalog2}.write_test.tb_with_local_ip values 
(5,'e','a',1) """
+            sql """ insert into ${catalog3}.write_test.tb_with_local_ip values 
(6,'f','a',1) """
+
+            qt_q01 """ select * from 
${catalog1}.write_test.tb_with_external_ip order by col_bigint_undef_signed """
+            qt_q02 """ select * from ${catalog1}.write_test.tb_with_local_ip 
order by col_bigint_undef_signed """
+
+            sql """drop catalog if exists ${catalog1}"""
+            sql """drop catalog if exists ${catalog2}"""
+            sql """drop catalog if exists ${catalog3}"""
+
+        } finally {
+        }
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to