This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fa0ad568176 [exec](compress) use FragmentTransmissionCompressionCodec
control the exchange compress behavior (#28818)
fa0ad568176 is described below
commit fa0ad568176827c2774bde0bb83b0be296eb541b
Author: HappenLee <[email protected]>
AuthorDate: Fri Dec 22 19:50:57 2023 +0800
[exec](compress) use FragmentTransmissionCompressionCodec control the
exchange compress behavior (#28818)
---
be/src/common/config.cpp | 2 --
be/src/common/config.h | 2 --
be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +-
be/src/pipeline/exec/exchange_sink_operator.h | 2 +-
be/src/runtime/runtime_state.h | 6 +++++-
be/src/vec/core/block.cpp | 2 +-
be/src/vec/sink/vdata_stream_sender.h | 2 +-
be/test/vec/core/block_test.cpp | 2 --
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +-
9 files changed, 10 insertions(+), 12 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ecc44a08e47..11921eac8b5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -222,8 +222,6 @@ DEFINE_Int32(be_service_threads, "64");
// or 3x the number of cores. This keeps the cores busy without causing
excessive
// thrashing.
DEFINE_Int32(num_threads_per_core, "3");
-// if true, compresses tuple data in Serialize
-DEFINE_mBool(compress_rowbatches, "true");
DEFINE_mBool(rowbatch_align_tuple_offset, "false");
// interval between profile reports; in seconds
DEFINE_mInt32(status_report_interval, "5");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a9508c6e8af..c73637200af 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -271,8 +271,6 @@ DECLARE_Int32(be_service_threads);
// or 3x the number of cores. This keeps the cores busy without causing
excessive
// thrashing.
DECLARE_Int32(num_threads_per_core);
-// if true, compresses tuple data in Serialize
-DECLARE_mBool(compress_rowbatches);
DECLARE_mBool(rowbatch_align_tuple_offset);
// interval between profile reports; in seconds
DECLARE_mInt32(status_report_interval);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 548680a25cd..43bec0bd92d 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -252,7 +252,7 @@ std::string ExchangeSinkLocalState::name_suffix() {
return name;
}
-segment_v2::CompressionTypePB& ExchangeSinkLocalState::compression_type() {
+segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const
{
return _parent->cast<ExchangeSinkOperatorX>()._compression_type;
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 70ae126fca7..5df03ea7773 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -145,7 +145,7 @@ public:
[[nodiscard]] int sender_id() const { return _sender_id; }
std::string name_suffix() override;
- segment_v2::CompressionTypePB& compression_type();
+ segment_v2::CompressionTypePB compression_type() const;
std::string debug_string(int indentation_level) const override;
std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 989655a36c7..6ced139eb4c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -393,9 +393,13 @@ public:
if (_query_options.__isset.fragment_transmission_compression_codec) {
if (_query_options.fragment_transmission_compression_codec ==
"lz4") {
return segment_v2::CompressionTypePB::LZ4;
+ } else if (_query_options.fragment_transmission_compression_codec
== "snappy") {
+ return segment_v2::CompressionTypePB::SNAPPY;
+ } else {
+ return segment_v2::CompressionTypePB::NO_COMPRESSION;
}
}
- return segment_v2::CompressionTypePB::SNAPPY;
+ return segment_v2::CompressionTypePB::NO_COMPRESSION;
}
bool skip_storage_engine_merge() const {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index d3d7523c0f3..a7965d03ce3 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -866,7 +866,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
*uncompressed_bytes = content_uncompressed_size;
// compress
- if (config::compress_rowbatches && content_uncompressed_size > 0) {
+ if (compression_type != segment_v2::NO_COMPRESSION &&
content_uncompressed_size > 0) {
SCOPED_RAW_TIMER(&_compress_time_ns);
pblock->set_compression_type(compression_type);
pblock->set_uncompressed_size(content_uncompressed_size);
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 54822f89689..f59dad266f8 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -149,7 +149,7 @@ public:
QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; }
RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; }
- segment_v2::CompressionTypePB& compression_type() { return
_compression_type; }
+ segment_v2::CompressionTypePB compression_type() const { return
_compression_type; }
protected:
friend class BlockSerializer<VDataStreamSender>;
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 54dd6c1136f..020c4f2e923 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -114,7 +114,6 @@ void fill_block_with_array_string(vectorized::Block& block)
{
}
void serialize_and_deserialize_test(segment_v2::CompressionTypePB
compression_type) {
- config::compress_rowbatches = true;
// int
{
auto vec = vectorized::ColumnVector<Int32>::create();
@@ -296,7 +295,6 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
}
TEST(BlockTest, SerializeAndDeserializeBlock) {
- config::compress_rowbatches = true;
serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY);
serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index b0e20d21ad4..33319717447 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2720,7 +2720,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
tResult.setEnableCommonExprPushdown(enableCommonExprPushdown);
tResult.setCheckOverflowForDecimal(checkOverflowForDecimal);
-
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
+
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec.trim().toLowerCase());
tResult.setEnableLocalExchange(enableLocalExchange);
tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]