This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8b349155182 [Fix](compress) Fix occasional crushes when serializing 
blocks (#32672)
8b349155182 is described below

commit 8b34915518206ad705d3a6f9ca8cc2a27e3b4e9f
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Sat Mar 23 00:36:34 2024 +0800

    [Fix](compress) Fix occasional crushes when serializing blocks (#32672)
---
 be/src/vec/core/block.cpp                          |  4 +-
 .../main/java/org/apache/doris/catalog/Env.java    | 22 ++++---
 .../java/org/apache/doris/qe/SessionVariable.java  | 13 ++++
 .../main/java/org/apache/doris/qe/VariableMgr.java | 71 ++--------------------
 4 files changed, 33 insertions(+), 77 deletions(-)

diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 51efbd4cb79..13edbdccc73 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -868,9 +868,9 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
         buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
     }
     *uncompressed_bytes = content_uncompressed_size;
-    const size_t serialize_bytes = buf - column_values.data();
+    const size_t serialize_bytes = buf - column_values.data() + 
STREAMVBYTE_PADDING;
     *compressed_bytes = serialize_bytes;
-    column_values.resize(serialize_bytes + STREAMVBYTE_PADDING);
+    column_values.resize(serialize_bytes);
 
     // compress
     if (compression_type != segment_v2::NO_COMPRESSION && 
content_uncompressed_size > 0) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 632597bf6df..f6fd621e4eb 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -229,6 +229,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.GlobalVariable;
 import org.apache.doris.qe.JournalObservable;
 import org.apache.doris.qe.QueryCancelWorker;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
@@ -1428,6 +1429,7 @@ public class Env {
         }
     }
 
+    @SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
     private void transferToMaster() {
         // stop replayer
         if (replayer != null) {
@@ -1495,24 +1497,24 @@ public class Env {
                 // because the default parallelism of pipeline engine is 
higher than previous version.
                 // so set parallel_pipeline_task_num to 
parallel_fragment_exec_instance_num
                 int newVal = 
VariableMgr.newSessionVariable().parallelExecInstanceNum;
-                VariableMgr.setGlobalPipelineTask(newVal);
-                LOG.info("upgrade FE from 1.x to 2.0, set 
parallel_pipeline_task_num "
-                        + "to parallel_fragment_exec_instance_num: {}", 
newVal);
+                VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", 
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
+                        String.valueOf(newVal));
 
                 // similar reason as above, need to upgrade broadcast scale 
factor during 1.2 to 2.x
                 // if the default value has been upgraded
                 double newBcFactorVal = 
VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
-                VariableMgr.setGlobalBroadcastScaleFactor(newBcFactorVal);
-                LOG.info("upgrade FE from 1.x to 2.x, set 
broadcast_right_table_scale_factor "
-                        + "to new default value: {}", newBcFactorVal);
+                VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
+                        SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
+                        String.valueOf(newBcFactorVal));
 
                 // similar reason as above, need to upgrade 
enable_nereids_planner to true
-                VariableMgr.enableNereidsPlanner();
-                LOG.info("upgrade FE from 1.x to 2.x, set 
enable_nereids_planner to new default value: true");
+                VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", 
SessionVariable.ENABLE_NEREIDS_PLANNER,
+                        "true");
             }
             if (journalVersion <= FeMetaVersion.VERSION_123) {
-                VariableMgr.enableNereidsDml();
-                LOG.info("upgrade FE from 2.0 to 2.1, set enable_nereids_dml 
to new default value: true");
+                VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", 
SessionVariable.ENABLE_NEREIDS_DML, "true");
+                VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
+                        
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
             }
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index fa524257d28..7abe546d53b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1741,6 +1741,7 @@ public class SessionVariable implements Serializable, 
Writable {
             this.enableFunctionPushdown = true;
             this.enableDeleteSubPredicateV2 = true;
         }
+
         /*
         switch (randomInt) {
             case 0:
@@ -1791,6 +1792,18 @@ public class SessionVariable implements Serializable, 
Writable {
             } else {
                 this.enableFoldConstantByBe = true;
             }
+
+            switch (Config.pull_request_id % 3) {
+                case 0:
+                    this.fragmentTransmissionCompressionCodec = "snappy";
+                    break;
+                case 1:
+                    this.fragmentTransmissionCompressionCodec = "lz4";
+                    break;
+                default:
+                    this.fragmentTransmissionCompressionCodec = "none";
+            }
+
             this.runtimeFilterType = 1 << randomInt;
             this.enableParallelScan = Config.pull_request_id % 2 == 0 ? 
randomInt % 2 == 0 : randomInt % 1 == 0;
             switch (randomInt) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index e83fd474daf..6e75f17a042 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -389,84 +389,25 @@ public class VariableMgr {
         }
     }
 
-    public static void setGlobalPipelineTask(int instance) {
+    public static void refreshDefaultSessionVariables(String versionMsg, 
String sessionVar, String value) {
         wlock.lock();
         try {
-            VarContext ctx = 
ctxByVarName.get(SessionVariable.PARALLEL_PIPELINE_TASK_NUM);
+            VarContext ctx = ctxByVarName.get(sessionVar);
             try {
-                setValue(ctx.getObj(), new 
SessionVariableField(ctx.getField()), String.valueOf(instance));
+                setValue(ctx.getObj(), new 
SessionVariableField(ctx.getField()), value);
             } catch (DdlException e) {
-                LOG.warn("failed to set global variable: {}", 
SessionVariable.PARALLEL_PIPELINE_TASK_NUM, e);
+                LOG.warn("failed to set global variable: {}", sessionVar, e);
                 return;
             }
 
             // write edit log
             GlobalVarPersistInfo info = new 
GlobalVarPersistInfo(defaultSessionVariable,
-                    
Lists.newArrayList(SessionVariable.PARALLEL_PIPELINE_TASK_NUM));
-            Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
-        } finally {
-            wlock.unlock();
-        }
-    }
-
-    public static void setGlobalBroadcastScaleFactor(double factor) {
-        wlock.lock();
-        try {
-            VarContext ctx = 
ctxByVarName.get(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR);
-            try {
-                setValue(ctx.getObj(), new 
SessionVariableField(ctx.getField()), String.valueOf(factor));
-            } catch (DdlException e) {
-                LOG.warn("failed to set global variable: {}", 
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, e);
-                return;
-            }
-
-            // write edit log
-            GlobalVarPersistInfo info = new 
GlobalVarPersistInfo(defaultSessionVariable,
-                    
Lists.newArrayList(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR));
-            Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
-        } finally {
-            wlock.unlock();
-        }
-    }
-
-    public static void enableNereidsPlanner() {
-        wlock.lock();
-        try {
-            VarContext ctx = 
ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_PLANNER);
-            try {
-                setValue(ctx.getObj(), new 
SessionVariableField(ctx.getField()), String.valueOf(true));
-            } catch (DdlException e) {
-                LOG.warn("failed to set global variable: {}", 
SessionVariable.ENABLE_NEREIDS_PLANNER, e);
-                return;
-            }
-
-            // write edit log
-            GlobalVarPersistInfo info = new 
GlobalVarPersistInfo(defaultSessionVariable,
-                    
Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_PLANNER));
-            Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
-        } finally {
-            wlock.unlock();
-        }
-    }
-
-    public static void enableNereidsDml() {
-        wlock.lock();
-        try {
-            VarContext ctx = 
ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_DML);
-            try {
-                setValue(ctx.getObj(), new 
SessionVariableField(ctx.getField()), String.valueOf(true));
-            } catch (DdlException e) {
-                LOG.warn("failed to set global variable: {}", 
SessionVariable.ENABLE_NEREIDS_DML, e);
-                return;
-            }
-
-            // write edit log
-            GlobalVarPersistInfo info = new 
GlobalVarPersistInfo(defaultSessionVariable,
-                    Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_DML));
+                    Lists.newArrayList(sessionVar));
             Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
         } finally {
             wlock.unlock();
         }
+        LOG.info("upgrade FE from {}, set {} to new default value: {}", 
versionMsg, sessionVar, value);
     }
 
     public static void setLowerCaseTableNames(int mode) throws DdlException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to