This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8b349155182 [Fix](compress) Fix occasional crushes when serializing blocks (#32672) 8b349155182 is described below commit 8b34915518206ad705d3a6f9ca8cc2a27e3b4e9f Author: HappenLee <happen...@hotmail.com> AuthorDate: Sat Mar 23 00:36:34 2024 +0800 [Fix](compress) Fix occasional crushes when serializing blocks (#32672) --- be/src/vec/core/block.cpp | 4 +- .../main/java/org/apache/doris/catalog/Env.java | 22 ++++--- .../java/org/apache/doris/qe/SessionVariable.java | 13 ++++ .../main/java/org/apache/doris/qe/VariableMgr.java | 71 ++-------------------- 4 files changed, 33 insertions(+), 77 deletions(-) diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 51efbd4cb79..13edbdccc73 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -868,9 +868,9 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); } *uncompressed_bytes = content_uncompressed_size; - const size_t serialize_bytes = buf - column_values.data(); + const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; *compressed_bytes = serialize_bytes; - column_values.resize(serialize_bytes + STREAMVBYTE_PADDING); + column_values.resize(serialize_bytes); // compress if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 632597bf6df..f6fd621e4eb 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -229,6 +229,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.QueryCancelWorker; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; @@ -1428,6 +1429,7 @@ public class Env { } } + @SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"}) private void transferToMaster() { // stop replayer if (replayer != null) { @@ -1495,24 +1497,24 @@ public class Env { // because the default parallelism of pipeline engine is higher than previous version. // so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum; - VariableMgr.setGlobalPipelineTask(newVal); - LOG.info("upgrade FE from 1.x to 2.0, set parallel_pipeline_task_num " - + "to parallel_fragment_exec_instance_num: {}", newVal); + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.PARALLEL_PIPELINE_TASK_NUM, + String.valueOf(newVal)); // similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x // if the default value has been upgraded double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor(); - VariableMgr.setGlobalBroadcastScaleFactor(newBcFactorVal); - LOG.info("upgrade FE from 1.x to 2.x, set broadcast_right_table_scale_factor " - + "to new default value: {}", newBcFactorVal); + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", + SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, + String.valueOf(newBcFactorVal)); // similar reason as above, need to upgrade enable_nereids_planner to true - VariableMgr.enableNereidsPlanner(); - LOG.info("upgrade FE from 1.x to 2.x, set enable_nereids_planner to new default value: true"); + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER, + "true"); } if (journalVersion <= FeMetaVersion.VERSION_123) { - VariableMgr.enableNereidsDml(); - LOG.info("upgrade FE from 2.0 to 2.1, set enable_nereids_dml to new default value: true"); + VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML, "true"); + VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", + SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index fa524257d28..7abe546d53b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1741,6 +1741,7 @@ public class SessionVariable implements Serializable, Writable { this.enableFunctionPushdown = true; this.enableDeleteSubPredicateV2 = true; } + /* switch (randomInt) { case 0: @@ -1791,6 +1792,18 @@ public class SessionVariable implements Serializable, Writable { } else { this.enableFoldConstantByBe = true; } + + switch (Config.pull_request_id % 3) { + case 0: + this.fragmentTransmissionCompressionCodec = "snappy"; + break; + case 1: + this.fragmentTransmissionCompressionCodec = "lz4"; + break; + default: + this.fragmentTransmissionCompressionCodec = "none"; + } + this.runtimeFilterType = 1 << randomInt; this.enableParallelScan = Config.pull_request_id % 2 == 0 ? randomInt % 2 == 0 : randomInt % 1 == 0; switch (randomInt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java index e83fd474daf..6e75f17a042 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -389,84 +389,25 @@ public class VariableMgr { } } - public static void setGlobalPipelineTask(int instance) { + public static void refreshDefaultSessionVariables(String versionMsg, String sessionVar, String value) { wlock.lock(); try { - VarContext ctx = ctxByVarName.get(SessionVariable.PARALLEL_PIPELINE_TASK_NUM); + VarContext ctx = ctxByVarName.get(sessionVar); try { - setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(instance)); + setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), value); } catch (DdlException e) { - LOG.warn("failed to set global variable: {}", SessionVariable.PARALLEL_PIPELINE_TASK_NUM, e); + LOG.warn("failed to set global variable: {}", sessionVar, e); return; } // write edit log GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, - Lists.newArrayList(SessionVariable.PARALLEL_PIPELINE_TASK_NUM)); - Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); - } finally { - wlock.unlock(); - } - } - - public static void setGlobalBroadcastScaleFactor(double factor) { - wlock.lock(); - try { - VarContext ctx = ctxByVarName.get(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR); - try { - setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(factor)); - } catch (DdlException e) { - LOG.warn("failed to set global variable: {}", SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, e); - return; - } - - // write edit log - GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, - Lists.newArrayList(SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR)); - Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); - } finally { - wlock.unlock(); - } - } - - public static void enableNereidsPlanner() { - wlock.lock(); - try { - VarContext ctx = ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_PLANNER); - try { - setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(true)); - } catch (DdlException e) { - LOG.warn("failed to set global variable: {}", SessionVariable.ENABLE_NEREIDS_PLANNER, e); - return; - } - - // write edit log - GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, - Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_PLANNER)); - Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); - } finally { - wlock.unlock(); - } - } - - public static void enableNereidsDml() { - wlock.lock(); - try { - VarContext ctx = ctxByVarName.get(SessionVariable.ENABLE_NEREIDS_DML); - try { - setValue(ctx.getObj(), new SessionVariableField(ctx.getField()), String.valueOf(true)); - } catch (DdlException e) { - LOG.warn("failed to set global variable: {}", SessionVariable.ENABLE_NEREIDS_DML, e); - return; - } - - // write edit log - GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, - Lists.newArrayList(SessionVariable.ENABLE_NEREIDS_DML)); + Lists.newArrayList(sessionVar)); Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); } finally { wlock.unlock(); } + LOG.info("upgrade FE from {}, set {} to new default value: {}", versionMsg, sessionVar, value); } public static void setLowerCaseTableNames(int mode) throws DdlException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org