This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fa0ad568176 [exec](compress) use FragmentTransmissionCompressionCodec 
control the exchange compress behavior (#28818)
fa0ad568176 is described below

commit fa0ad568176827c2774bde0bb83b0be296eb541b
Author: HappenLee <[email protected]>
AuthorDate: Fri Dec 22 19:50:57 2023 +0800

    [exec](compress) use FragmentTransmissionCompressionCodec control the 
exchange compress behavior (#28818)
---
 be/src/common/config.cpp                                          | 2 --
 be/src/common/config.h                                            | 2 --
 be/src/pipeline/exec/exchange_sink_operator.cpp                   | 2 +-
 be/src/pipeline/exec/exchange_sink_operator.h                     | 2 +-
 be/src/runtime/runtime_state.h                                    | 6 +++++-
 be/src/vec/core/block.cpp                                         | 2 +-
 be/src/vec/sink/vdata_stream_sender.h                             | 2 +-
 be/test/vec/core/block_test.cpp                                   | 2 --
 fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +-
 9 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ecc44a08e47..11921eac8b5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -222,8 +222,6 @@ DEFINE_Int32(be_service_threads, "64");
 // or 3x the number of cores.  This keeps the cores busy without causing 
excessive
 // thrashing.
 DEFINE_Int32(num_threads_per_core, "3");
-// if true, compresses tuple data in Serialize
-DEFINE_mBool(compress_rowbatches, "true");
 DEFINE_mBool(rowbatch_align_tuple_offset, "false");
 // interval between profile reports; in seconds
 DEFINE_mInt32(status_report_interval, "5");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a9508c6e8af..c73637200af 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -271,8 +271,6 @@ DECLARE_Int32(be_service_threads);
 // or 3x the number of cores.  This keeps the cores busy without causing 
excessive
 // thrashing.
 DECLARE_Int32(num_threads_per_core);
-// if true, compresses tuple data in Serialize
-DECLARE_mBool(compress_rowbatches);
 DECLARE_mBool(rowbatch_align_tuple_offset);
 // interval between profile reports; in seconds
 DECLARE_mInt32(status_report_interval);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 548680a25cd..43bec0bd92d 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -252,7 +252,7 @@ std::string ExchangeSinkLocalState::name_suffix() {
     return name;
 }
 
-segment_v2::CompressionTypePB& ExchangeSinkLocalState::compression_type() {
+segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const 
{
     return _parent->cast<ExchangeSinkOperatorX>()._compression_type;
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 70ae126fca7..5df03ea7773 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -145,7 +145,7 @@ public:
     [[nodiscard]] int sender_id() const { return _sender_id; }
 
     std::string name_suffix() override;
-    segment_v2::CompressionTypePB& compression_type();
+    segment_v2::CompressionTypePB compression_type() const;
     std::string debug_string(int indentation_level) const override;
 
     std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 989655a36c7..6ced139eb4c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -393,9 +393,13 @@ public:
         if (_query_options.__isset.fragment_transmission_compression_codec) {
             if (_query_options.fragment_transmission_compression_codec == 
"lz4") {
                 return segment_v2::CompressionTypePB::LZ4;
+            } else if (_query_options.fragment_transmission_compression_codec 
== "snappy") {
+                return segment_v2::CompressionTypePB::SNAPPY;
+            } else {
+                return segment_v2::CompressionTypePB::NO_COMPRESSION;
             }
         }
-        return segment_v2::CompressionTypePB::SNAPPY;
+        return segment_v2::CompressionTypePB::NO_COMPRESSION;
     }
 
     bool skip_storage_engine_merge() const {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index d3d7523c0f3..a7965d03ce3 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -866,7 +866,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
     *uncompressed_bytes = content_uncompressed_size;
 
     // compress
-    if (config::compress_rowbatches && content_uncompressed_size > 0) {
+    if (compression_type != segment_v2::NO_COMPRESSION && 
content_uncompressed_size > 0) {
         SCOPED_RAW_TIMER(&_compress_time_ns);
         pblock->set_compression_type(compression_type);
         pblock->set_uncompressed_size(content_uncompressed_size);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 54822f89689..f59dad266f8 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -149,7 +149,7 @@ public:
     QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
     bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; }
     RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; }
-    segment_v2::CompressionTypePB& compression_type() { return 
_compression_type; }
+    segment_v2::CompressionTypePB compression_type() const { return 
_compression_type; }
 
 protected:
     friend class BlockSerializer<VDataStreamSender>;
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 54dd6c1136f..020c4f2e923 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -114,7 +114,6 @@ void fill_block_with_array_string(vectorized::Block& block) 
{
 }
 
 void serialize_and_deserialize_test(segment_v2::CompressionTypePB 
compression_type) {
-    config::compress_rowbatches = true;
     // int
     {
         auto vec = vectorized::ColumnVector<Int32>::create();
@@ -296,7 +295,6 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
 }
 
 TEST(BlockTest, SerializeAndDeserializeBlock) {
-    config::compress_rowbatches = true;
     serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY);
     serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4);
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index b0e20d21ad4..33319717447 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2720,7 +2720,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableFunctionPushdown(enableFunctionPushdown);
         tResult.setEnableCommonExprPushdown(enableCommonExprPushdown);
         tResult.setCheckOverflowForDecimal(checkOverflowForDecimal);
-        
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
+        
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec.trim().toLowerCase());
         tResult.setEnableLocalExchange(enableLocalExchange);
 
         tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to