This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 765f1b6efe [Refactor](load) Extract load public code (#22304) 765f1b6efe is described below commit 765f1b6efe81d3d2c3a3d06846f0623f07104942 Author: zzzzzzzs <1443539...@qq.com> AuthorDate: Sat Jul 29 12:56:31 2023 +0800 [Refactor](load) Extract load public code (#22304) --- be/src/http/action/stream_load.cpp | 66 ++--------------- be/src/util/load_util.cpp | 84 ++++++++++++++++++++++ be/src/util/load_util.h | 33 +++++++++ be/test/util/load_util_test.cpp | 143 +++++++++++++++++++++++++++++++++++++ 4 files changed, 264 insertions(+), 62 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 30fa4f81a0..588d887d20 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -60,6 +60,7 @@ #include "runtime/stream_load/stream_load_recorder.h" #include "util/byte_buffer.h" #include "util/doris_metrics.h" +#include "util/load_util.h" #include "util/metrics.h" #include "util/string_util.h" #include "util/thrift_rpc_helper.h" @@ -78,65 +79,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit TStreamLoadPutResult k_stream_load_put_result; #endif -static void parse_format(const std::string& format_str, const std::string& compress_type_str, - TFileFormatType::type* format_type, - TFileCompressType::type* compress_type) { - if (format_str.empty()) { - parse_format("CSV", compress_type_str, format_type, compress_type); - return; - } - *compress_type = TFileCompressType::PLAIN; - *format_type = TFileFormatType::FORMAT_UNKNOWN; - if (iequal(format_str, "CSV")) { - if (compress_type_str.empty()) { - *format_type = TFileFormatType::FORMAT_CSV_PLAIN; - } else if (iequal(compress_type_str, "GZ")) { - *format_type = TFileFormatType::FORMAT_CSV_GZ; - *compress_type = TFileCompressType::GZ; - } else if (iequal(compress_type_str, "LZO")) { - *format_type = TFileFormatType::FORMAT_CSV_LZO; - *compress_type = TFileCompressType::LZO; - } else if (iequal(compress_type_str, "BZ2")) { - *format_type = TFileFormatType::FORMAT_CSV_BZ2; - *compress_type = TFileCompressType::BZ2; - } else if (iequal(compress_type_str, "LZ4")) { - *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; - *compress_type = TFileCompressType::LZ4FRAME; - } else if (iequal(compress_type_str, "LZOP")) { - *format_type = TFileFormatType::FORMAT_CSV_LZOP; - *compress_type = TFileCompressType::LZO; - } else if (iequal(compress_type_str, "DEFLATE")) { - *format_type = TFileFormatType::FORMAT_CSV_DEFLATE; - *compress_type = TFileCompressType::DEFLATE; - } - } else if (iequal(format_str, "JSON")) { - if (compress_type_str.empty()) { - *format_type = TFileFormatType::FORMAT_JSON; - } - } else if (iequal(format_str, "PARQUET")) { - *format_type = TFileFormatType::FORMAT_PARQUET; - } else if (iequal(format_str, "ORC")) { - *format_type = TFileFormatType::FORMAT_ORC; - } - return; -} - -static bool is_format_support_streaming(TFileFormatType::type format) { - switch (format) { - case TFileFormatType::FORMAT_CSV_PLAIN: - case TFileFormatType::FORMAT_CSV_BZ2: - case TFileFormatType::FORMAT_CSV_DEFLATE: - case TFileFormatType::FORMAT_CSV_GZ: - case TFileFormatType::FORMAT_CSV_LZ4FRAME: - case TFileFormatType::FORMAT_CSV_LZO: - case TFileFormatType::FORMAT_CSV_LZOP: - case TFileFormatType::FORMAT_JSON: - return true; - default: - return false; - } -} - StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) { _stream_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("stream_load"); @@ -290,8 +232,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea //treat as CSV format_str = BeConsts::CSV; } - parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format, - &ctx->compress_type); + LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format, + &ctx->compress_type); if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { return Status::InternalError("unknown data format, format={}", http_req->header(HTTP_FORMAT_KEY)); @@ -392,7 +334,7 @@ void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) { Status StreamLoadAction::_process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) { // Now we use stream - ctx->use_streaming = is_format_support_streaming(ctx->format); + ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format); // put request TStreamLoadPutRequest request; diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp new file mode 100644 index 0000000000..8736561db4 --- /dev/null +++ b/be/src/util/load_util.cpp @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/load_util.h" + +#include <string> + +#include "util/string_util.h" + +namespace doris { +void LoadUtil::parse_format(const std::string& format_str, const std::string& compress_type_str, + TFileFormatType::type* format_type, + TFileCompressType::type* compress_type) { + if (format_str.empty()) { + parse_format("CSV", compress_type_str, format_type, compress_type); + return; + } + *compress_type = TFileCompressType::PLAIN; + *format_type = TFileFormatType::FORMAT_UNKNOWN; + if (iequal(format_str, "CSV")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_CSV_PLAIN; + } else if (iequal(compress_type_str, "GZ")) { + *format_type = TFileFormatType::FORMAT_CSV_GZ; + *compress_type = TFileCompressType::GZ; + } else if (iequal(compress_type_str, "LZO")) { + *format_type = TFileFormatType::FORMAT_CSV_LZO; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "BZ2")) { + *format_type = TFileFormatType::FORMAT_CSV_BZ2; + *compress_type = TFileCompressType::BZ2; + } else if (iequal(compress_type_str, "LZ4")) { + *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + *compress_type = TFileCompressType::LZ4FRAME; + } else if (iequal(compress_type_str, "LZOP")) { + *format_type = TFileFormatType::FORMAT_CSV_LZOP; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "DEFLATE")) { + *format_type = TFileFormatType::FORMAT_CSV_DEFLATE; + *compress_type = TFileCompressType::DEFLATE; + } + } else if (iequal(format_str, "JSON")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_JSON; + } + } else if (iequal(format_str, "PARQUET")) { + *format_type = TFileFormatType::FORMAT_PARQUET; + } else if (iequal(format_str, "ORC")) { + *format_type = TFileFormatType::FORMAT_ORC; + } + return; +} + +bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) { + switch (format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_JSON: + return true; + default: + return false; + } + return false; +} +} // namespace doris \ No newline at end of file diff --git a/be/src/util/load_util.h b/be/src/util/load_util.h new file mode 100644 index 0000000000..60bd79ab1b --- /dev/null +++ b/be/src/util/load_util.h @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> + +#include <string> + +namespace doris { +class LoadUtil { +public: + static void parse_format(const std::string& format_str, const std::string& compress_type_str, + TFileFormatType::type* format_type, + TFileCompressType::type* compress_type); + + static bool is_format_support_streaming(TFileFormatType::type format); +}; +} // namespace doris \ No newline at end of file diff --git a/be/test/util/load_util_test.cpp b/be/test/util/load_util_test.cpp new file mode 100644 index 0000000000..0b4967befb --- /dev/null +++ b/be/test/util/load_util_test.cpp @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/load_util.h" + +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> + +#include "gtest/gtest_pred_impl.h" + +namespace doris { + +class LoadUtilTest : public testing::Test { +public: + LoadUtilTest() {} + ~LoadUtilTest() override = default; +}; + +TEST_F(LoadUtilTest, StreamingTest) { + { + EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_BZ2)); + EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_DEFLATE)); + EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_GZ)); + EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_LZ4FRAME)); + EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_LZO)); + EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_CSV_LZOP)); + EXPECT_TRUE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_JSON)); + EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_PARQUET)); + EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_ORC)); + EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_PROTO)); + EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_JNI)); + EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_AVRO)); + EXPECT_FALSE(LoadUtil::is_format_support_streaming(TFileFormatType::FORMAT_UNKNOWN)); + } +} + +TEST_F(LoadUtilTest, ParseTest) { + { + // "", "" + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("", "", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type); + EXPECT_EQ(TFileCompressType::PLAIN, compress_type); + } + { + // CSV, GZ + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("CSV", "GZ", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_GZ, format_type); + EXPECT_EQ(TFileCompressType::GZ, compress_type); + } + { + // CSV, LZO + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("CSV", "LZO", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_LZO, format_type); + EXPECT_EQ(TFileCompressType::LZO, compress_type); + } + { + // CSV, BZ2 + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("CSV", "BZ2", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_BZ2, format_type); + EXPECT_EQ(TFileCompressType::BZ2, compress_type); + } + { + // CSV, LZ4 + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("CSV", "LZ4", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_LZ4FRAME, format_type); + EXPECT_EQ(TFileCompressType::LZ4FRAME, compress_type); + } + { + // CSV, LZOP + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("CSV", "LZOP", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_LZOP, format_type); + EXPECT_EQ(TFileCompressType::LZO, compress_type); + } + { + // CSV, DEFLATE + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("CSV", "DEFLATE", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_DEFLATE, format_type); + EXPECT_EQ(TFileCompressType::DEFLATE, compress_type); + } + { + // JSON, "" + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("JSON", "", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type); + EXPECT_EQ(TFileCompressType::PLAIN, compress_type); + } + { + // JSON, GZ + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("JSON", "GZ", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_UNKNOWN, format_type); + EXPECT_EQ(TFileCompressType::PLAIN, compress_type); + } + { + // PARQUET, "" + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("PARQUET", "", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_PARQUET, format_type); + EXPECT_EQ(TFileCompressType::PLAIN, compress_type); + } + { + // ORC, "" + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("ORC", "", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_ORC, format_type); + EXPECT_EQ(TFileCompressType::PLAIN, compress_type); + } +} + +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org