This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 7a8fbe5 [internal] [doris-1084] support compressed csv file in stream load (#5463) 7a8fbe5 is described below commit 7a8fbe5db81011e33755a3238fb55f473e42ae80 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Thu Mar 11 10:53:05 2021 +0800 [internal] [doris-1084] support compressed csv file in stream load (#5463) --- be/src/exec/broker_scanner.cpp | 6 ++--- be/src/http/action/stream_load.cpp | 54 +++++++++++++++++++++++++++++--------- be/src/http/http_common.h | 1 + gensrc/thrift/PlanNodes.thrift | 2 +- 4 files changed, 46 insertions(+), 17 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index f732244..e14f1fb 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -192,7 +192,7 @@ Status BrokerScanner::open_file_reader() { } Status BrokerScanner::create_decompressor(TFileFormatType::type type) { - if (_cur_decompressor == nullptr) { + if (_cur_decompressor != nullptr) { delete _cur_decompressor; _cur_decompressor = nullptr; } @@ -220,7 +220,7 @@ Status BrokerScanner::create_decompressor(TFileFormatType::type type) { break; default: { std::stringstream ss; - ss << "Unknown format type, type=" << type; + ss << "Unknown format type, cannot inference compress type, type=" << type; return Status::InternalError(ss.str()); } } @@ -271,7 +271,7 @@ Status BrokerScanner::open_line_reader() { break; default: { std::stringstream ss; - ss << "Unknown format type, type=" << range.format_type; + ss << "Unknown format type, cannot init line reader, type=" << range.format_type; return Status::InternalError(ss.str()); } } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index fbcbba5..164676c 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -69,18 +69,46 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit TStreamLoadPutResult k_stream_load_put_result; #endif -static TFileFormatType::type parse_format(const std::string& format_str) { +static TFileFormatType::type parse_format(const std::string& format_str, + const std::string& compress_type) { + if (format_str.empty()) { + return parse_format("CSV", compress_type); + } + TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN; if (boost::iequals(format_str, "CSV")) { - return TFileFormatType::FORMAT_CSV_PLAIN; + if (compress_type.empty()) { + format_type = TFileFormatType::FORMAT_CSV_PLAIN; + } + if (boost::iequals(compress_type, "GZ")) { + format_type = TFileFormatType::FORMAT_CSV_GZ; + } else if (boost::iequals(compress_type, "LZO")) { + format_type = TFileFormatType::FORMAT_CSV_LZO; + } else if (boost::iequals(compress_type, "BZ2")) { + format_type = TFileFormatType::FORMAT_CSV_BZ2; + } else if (boost::iequals(compress_type, "LZ4FRAME")) { + format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + } else if (boost::iequals(compress_type, "LZOP")) { + format_type = TFileFormatType::FORMAT_CSV_LZOP; + } else if (boost::iequals(compress_type, "DEFLATE")) { + format_type = TFileFormatType::FORMAT_CSV_DEFLATE; + } } else if (boost::iequals(format_str, "JSON")) { - return TFileFormatType::FORMAT_JSON; + if (compress_type.empty()) { + format_type = TFileFormatType::FORMAT_JSON; + } } - return TFileFormatType::FORMAT_UNKNOWN; + return format_type; } static bool is_format_support_streaming(TFileFormatType::type format) { switch (format) { case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_JSON: return true; default: @@ -214,15 +242,15 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct } // get format of this put - if (http_req->header(HTTP_FORMAT_KEY).empty()) { - ctx->format = TFileFormatType::FORMAT_CSV_PLAIN; - } else { - ctx->format = parse_format(http_req->header(HTTP_FORMAT_KEY)); - if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { - std::stringstream ss; - ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY); - return Status::InternalError(ss.str()); - } + if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && boost::iequals(http_req->header(HTTP_FORMAT_KEY), "JSON")) { + return Status::InternalError("compress data of JSON format is not supported."); + } + ctx->format = + parse_format(http_req->header(HTTP_FORMAT_KEY), http_req->header(HTTP_COMPRESS_TYPE)); + if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { + std::stringstream ss; + ss << "unknown data format, format=" << http_req->header(HTTP_FORMAT_KEY); + return Status::InternalError(ss.str()); } // check content length diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index d9bf046..14f3e17 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -47,6 +47,7 @@ static const std::string HTTP_MERGE_TYPE = "merge_type"; static const std::string HTTP_DELETE_CONDITION = "delete"; static const std::string HTTP_FUNCTION_COLUMN = "function_column"; static const std::string HTTP_SEQUENCE_COL = "sequence_col"; +static const std::string HTTP_COMPRESS_TYPE = "compress_type"; static const std::string HTTP_100_CONTINUE = "100-continue"; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 96e1212..6dbdad2 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -105,7 +105,7 @@ enum TFileFormatType { FORMAT_PARQUET, FORMAT_CSV_DEFLATE, FORMAT_ORC, - FORMAT_JSON + FORMAT_JSON, } // One broker range information. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org