This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 765f1b6efe [Refactor](load) Extract load public code (#22304)
765f1b6efe is described below

commit 765f1b6efe81d3d2c3a3d06846f0623f07104942
Author: zzzzzzzs <1443539...@qq.com>
AuthorDate: Sat Jul 29 12:56:31 2023 +0800

    [Refactor](load) Extract load public code (#22304)
---
 be/src/http/action/stream_load.cpp |  66 ++---------------
 be/src/util/load_util.cpp          |  84 ++++++++++++++++++++++
 be/src/util/load_util.h            |  33 +++++++++
 be/test/util/load_util_test.cpp    | 143 +++++++++++++++++++++++++++++++++++++
 4 files changed, 264 insertions(+), 62 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 30fa4f81a0..588d887d20 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -60,6 +60,7 @@
 #include "runtime/stream_load/stream_load_recorder.h"
 #include "util/byte_buffer.h"
 #include "util/doris_metrics.h"
+#include "util/load_util.h"
 #include "util/metrics.h"
 #include "util/string_util.h"
 #include "util/thrift_rpc_helper.h"
@@ -78,65 +79,6 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit
 TStreamLoadPutResult k_stream_load_put_result;
 #endif
 
-static void parse_format(const std::string& format_str, const std::string& 
compress_type_str,
-                         TFileFormatType::type* format_type,
-                         TFileCompressType::type* compress_type) {
-    if (format_str.empty()) {
-        parse_format("CSV", compress_type_str, format_type, compress_type);
-        return;
-    }
-    *compress_type = TFileCompressType::PLAIN;
-    *format_type = TFileFormatType::FORMAT_UNKNOWN;
-    if (iequal(format_str, "CSV")) {
-        if (compress_type_str.empty()) {
-            *format_type = TFileFormatType::FORMAT_CSV_PLAIN;
-        } else if (iequal(compress_type_str, "GZ")) {
-            *format_type = TFileFormatType::FORMAT_CSV_GZ;
-            *compress_type = TFileCompressType::GZ;
-        } else if (iequal(compress_type_str, "LZO")) {
-            *format_type = TFileFormatType::FORMAT_CSV_LZO;
-            *compress_type = TFileCompressType::LZO;
-        } else if (iequal(compress_type_str, "BZ2")) {
-            *format_type = TFileFormatType::FORMAT_CSV_BZ2;
-            *compress_type = TFileCompressType::BZ2;
-        } else if (iequal(compress_type_str, "LZ4")) {
-            *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
-            *compress_type = TFileCompressType::LZ4FRAME;
-        } else if (iequal(compress_type_str, "LZOP")) {
-            *format_type = TFileFormatType::FORMAT_CSV_LZOP;
-            *compress_type = TFileCompressType::LZO;
-        } else if (iequal(compress_type_str, "DEFLATE")) {
-            *format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
-            *compress_type = TFileCompressType::DEFLATE;
-        }
-    } else if (iequal(format_str, "JSON")) {
-        if (compress_type_str.empty()) {
-            *format_type = TFileFormatType::FORMAT_JSON;
-        }
-    } else if (iequal(format_str, "PARQUET")) {
-        *format_type = TFileFormatType::FORMAT_PARQUET;
-    } else if (iequal(format_str, "ORC")) {
-        *format_type = TFileFormatType::FORMAT_ORC;
-    }
-    return;
-}
-
-static bool is_format_support_streaming(TFileFormatType::type format) {
-    switch (format) {
-    case TFileFormatType::FORMAT_CSV_PLAIN:
-    case TFileFormatType::FORMAT_CSV_BZ2:
-    case TFileFormatType::FORMAT_CSV_DEFLATE:
-    case TFileFormatType::FORMAT_CSV_GZ:
-    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
-    case TFileFormatType::FORMAT_CSV_LZO:
-    case TFileFormatType::FORMAT_CSV_LZOP:
-    case TFileFormatType::FORMAT_JSON:
-        return true;
-    default:
-        return false;
-    }
-}
-
 StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
     _stream_load_entity =
             
DorisMetrics::instance()->metric_registry()->register_entity("stream_load");
@@ -290,8 +232,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
std::shared_ptr<Strea
         //treat as CSV
         format_str = BeConsts::CSV;
     }
-    parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), 
&ctx->format,
-                 &ctx->compress_type);
+    LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), 
&ctx->format,
+                           &ctx->compress_type);
     if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
         return Status::InternalError("unknown data format, format={}",
                                      http_req->header(HTTP_FORMAT_KEY));
@@ -392,7 +334,7 @@ void 
StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
 Status StreamLoadAction::_process_put(HttpRequest* http_req,
                                       std::shared_ptr<StreamLoadContext> ctx) {
     // Now we use stream
-    ctx->use_streaming = is_format_support_streaming(ctx->format);
+    ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format);
 
     // put request
     TStreamLoadPutRequest request;
diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp
new file mode 100644
index 0000000000..8736561db4
--- /dev/null
+++ b/be/src/util/load_util.cpp
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/load_util.h"
+
+#include <string>
+
+#include "util/string_util.h"
+
+namespace doris {
+void LoadUtil::parse_format(const std::string& format_str, const std::string& 
compress_type_str,
+                            TFileFormatType::type* format_type,
+                            TFileCompressType::type* compress_type) {
+    if (format_str.empty()) {
+        parse_format("CSV", compress_type_str, format_type, compress_type);
+        return;
+    }
+    *compress_type = TFileCompressType::PLAIN;
+    *format_type = TFileFormatType::FORMAT_UNKNOWN;
+    if (iequal(format_str, "CSV")) {
+        if (compress_type_str.empty()) {
+            *format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+        } else if (iequal(compress_type_str, "GZ")) {
+            *format_type = TFileFormatType::FORMAT_CSV_GZ;
+            *compress_type = TFileCompressType::GZ;
+        } else if (iequal(compress_type_str, "LZO")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZO;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "BZ2")) {
+            *format_type = TFileFormatType::FORMAT_CSV_BZ2;
+            *compress_type = TFileCompressType::BZ2;
+        } else if (iequal(compress_type_str, "LZ4")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
+            *compress_type = TFileCompressType::LZ4FRAME;
+        } else if (iequal(compress_type_str, "LZOP")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZOP;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "DEFLATE")) {
+            *format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
+            *compress_type = TFileCompressType::DEFLATE;
+        }
+    } else if (iequal(format_str, "JSON")) {
+        if (compress_type_str.empty()) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+        }
+    } else if (iequal(format_str, "PARQUET")) {
+        *format_type = TFileFormatType::FORMAT_PARQUET;
+    } else if (iequal(format_str, "ORC")) {
+        *format_type = TFileFormatType::FORMAT_ORC;
+    }
+    return;
+}
+
+bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) {
+    switch (format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+    case TFileFormatType::FORMAT_CSV_BZ2:
+    case TFileFormatType::FORMAT_CSV_DEFLATE:
+    case TFileFormatType::FORMAT_CSV_GZ:
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+    case TFileFormatType::FORMAT_CSV_LZO:
+    case TFileFormatType::FORMAT_CSV_LZOP:
+    case TFileFormatType::FORMAT_JSON:
+        return true;
+    default:
+        return false;
+    }
+    return false;
+}
+} // namespace  doris
\ No newline at end of file
diff --git a/be/src/util/load_util.h b/be/src/util/load_util.h
new file mode 100644
index 0000000000..60bd79ab1b
--- /dev/null
+++ b/be/src/util/load_util.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include <string>
+
+namespace doris {
+class LoadUtil {
+public:
+    static void parse_format(const std::string& format_str, const std::string& 
compress_type_str,
+                             TFileFormatType::type* format_type,
+                             TFileCompressType::type* compress_type);
+
+    static bool is_format_support_streaming(TFileFormatType::type format);
+};
+} // namespace  doris
\ No newline at end of file
diff --git a/be/test/util/load_util_test.cpp b/be/test/util/load_util_test.cpp
new file mode 100644
index 0000000000..0b4967befb
--- /dev/null
+++ b/be/test/util/load_util_test.cpp
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/load_util.h"
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+
+namespace doris {
+
+class LoadUtilTest : public testing::Test {
+public:
+    LoadUtilTest() {}
+    ~LoadUtilTest() override = default;
+};
+
+TEST_F(LoadUtilTest, StreamingTest) {
+    {
+        
EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_PLAIN));
+        
EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_BZ2));
+        
EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_DEFLATE));
+        
EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_GZ));
+        
EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_LZ4FRAME));
+        
EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_LZO));
+        
EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_LZOP));
+        
EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_JSON));
+        
EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_PARQUET));
+        
EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_ORC));
+        
EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_PROTO));
+        
EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_JNI));
+        
EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_AVRO));
+        
EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_UNKNOWN));
+    }
+}
+
+TEST_F(LoadUtilTest, ParseTest) {
+    {
+        // "", ""
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("", "", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type);
+        EXPECT_EQ(TFileCompressType::PLAIN, compress_type);
+    }
+    {
+        // CSV, GZ
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("CSV", "GZ", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_GZ, format_type);
+        EXPECT_EQ(TFileCompressType::GZ, compress_type);
+    }
+    {
+        // CSV, LZO
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("CSV", "LZO", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_LZO, format_type);
+        EXPECT_EQ(TFileCompressType::LZO, compress_type);
+    }
+    {
+        // CSV, BZ2
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("CSV", "BZ2", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_BZ2, format_type);
+        EXPECT_EQ(TFileCompressType::BZ2, compress_type);
+    }
+    {
+        // CSV, LZ4
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("CSV", "LZ4", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_LZ4FRAME, format_type);
+        EXPECT_EQ(TFileCompressType::LZ4FRAME, compress_type);
+    }
+    {
+        // CSV, LZOP
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("CSV", "LZOP", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_LZOP, format_type);
+        EXPECT_EQ(TFileCompressType::LZO, compress_type);
+    }
+    {
+        // CSV, DEFLATE
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("CSV", "DEFLATE", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_DEFLATE, format_type);
+        EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
+    }
+    {
+        // JSON, ""
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::PLAIN, compress_type);
+    }
+    {
+        // JSON, GZ
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "GZ", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_UNKNOWN, format_type);
+        EXPECT_EQ(TFileCompressType::PLAIN, compress_type);
+    }
+    {
+        // PARQUET, ""
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("PARQUET", "", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_PARQUET, format_type);
+        EXPECT_EQ(TFileCompressType::PLAIN, compress_type);
+    }
+    {
+        // ORC, ""
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("ORC", "", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_ORC, format_type);
+        EXPECT_EQ(TFileCompressType::PLAIN, compress_type);
+    }
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to