xiaokang commented on code in PR #17494:
URL: https://github.com/apache/doris/pull/17494#discussion_r1131787424


##########
fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaChangeExpr.java:
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.VariantType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TExprNodeType;
+import org.apache.doris.thrift.TSchemaChangeInfo;
+import org.apache.doris.thrift.TTypeDesc;
+import org.apache.doris.thrift.TTypeNode;
+
+import com.google.common.base.Preconditions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+
+public class SchemaChangeExpr extends Expr {
+    private static final Logger LOG = 
LogManager.getLogger(SchemaChangeExpr.class);
+    // Target table id
+    private int tableId;
+    private SlotRef variantSlot;
+
+    public SchemaChangeExpr(SlotRef sourceSlot, int tableId) {
+        super();
+        Preconditions.checkNotNull(sourceSlot);
+        variantSlot = sourceSlot;
+        this.tableId = tableId;
+    }
+
+    @Override
+    protected void treeToThriftHelper(TExpr container) {
+        super.treeToThriftHelper(container);
+    }
+
+    @Override
+    protected void toThrift(TExprNode msg) {
+        TSchemaChangeInfo schemaInfo = new TSchemaChangeInfo();
+        schemaInfo.setTableId(tableId);
+        msg.setSchemaChangeInfo(schemaInfo);
+        // set src variant slot
+        variantSlot.toThrift(msg);
+        msg.node_type = TExprNodeType.SCHEMA_CHANGE_EXPR;
+        // set type info
+        TTypeDesc desc = new TTypeDesc();
+        desc.setTypes(new ArrayList<TTypeNode>());
+        VariantType variant = new VariantType();
+        variant.toThrift(desc);
+        msg.setType(desc);
+    }
+
+    @Override
+    public Expr clone() {
+        return new SchemaChangeExpr((SlotRef) getChild(0), tableId);
+    }
+
+    @Override
+    public String toSqlImpl() {
+        return "SCHEMA_CHANGE(" + getChild(0).toSql() + ")";
+    }
+
+    @Override
+    public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
+        Type childType = getChild(0).getType();

Review Comment:
   where is the child set?



##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java:
##########
@@ -154,9 +154,11 @@ public void plan(TUniqueId loadId, 
List<List<TBrokerFileStatus>> fileStatusesLis
             Column col = new Column(Column.DYNAMIC_COLUMN_NAME, Type.VARIANT, 
false, null, false, "",
                                     "stream load auto dynamic column");
             slotDesc.setIsMaterialized(true);
+            // Non-nullable slots will have 0 for the byte offset and -1 for 
the bit mask

Review Comment:
   what's the diffenent to setIsNullable(true)



##########
gensrc/thrift/Exprs.thrift:
##########
@@ -214,6 +221,7 @@ struct TExprNode {
   29: optional bool is_nullable
   
   30: optional TJsonLiteral json_literal
+  31: optional TSchemaChangeInfo schema_change_info

Review Comment:
   It't somewhat strange that Expr has schema_change_info.



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaChangeExpr.java:
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.VariantType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TExprNodeType;
+import org.apache.doris.thrift.TSchemaChangeInfo;
+import org.apache.doris.thrift.TTypeDesc;
+import org.apache.doris.thrift.TTypeNode;
+
+import com.google.common.base.Preconditions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+
+public class SchemaChangeExpr extends Expr {
+    private static final Logger LOG = 
LogManager.getLogger(SchemaChangeExpr.class);
+    // Target table id
+    private int tableId;
+    private SlotRef variantSlot;
+
+    public SchemaChangeExpr(SlotRef sourceSlot, int tableId) {
+        super();
+        Preconditions.checkNotNull(sourceSlot);
+        variantSlot = sourceSlot;
+        this.tableId = tableId;
+    }
+
+    @Override
+    protected void treeToThriftHelper(TExpr container) {
+        super.treeToThriftHelper(container);
+    }
+
+    @Override
+    protected void toThrift(TExprNode msg) {
+        TSchemaChangeInfo schemaInfo = new TSchemaChangeInfo();
+        schemaInfo.setTableId(tableId);
+        msg.setSchemaChangeInfo(schemaInfo);
+        // set src variant slot
+        variantSlot.toThrift(msg);
+        msg.node_type = TExprNodeType.SCHEMA_CHANGE_EXPR;
+        // set type info
+        TTypeDesc desc = new TTypeDesc();
+        desc.setTypes(new ArrayList<TTypeNode>());

Review Comment:
   what's the function of setTypes to an emply list?



##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -756,8 +758,10 @@ private static void initColumns(Table tbl, 
List<ImportColumnDesc> columnExprs,
                                     "stream load auto dynamic column");
             slotDesc.setType(Type.VARIANT);
             slotDesc.setColumn(col);
-            // alaways nullable
-            slotDesc.setIsNullable(true);
+            slotDesc.setIsNullable(false);
+            // Non-nullable slots will have 0 for the byte offset and -1 for 
the bit mask
+            slotDesc.setNullIndicatorBit(-1);
+            slotDesc.setNullIndicatorByte(0);
             slotDesc.setIsMaterialized(true);
             srcSlotIds.add(slotDesc.getId().asInt());
             slotDescByName.put(name, slotDesc);

Review Comment:
   Why delete composeJobInfoByLoadJob() and copy code?



##########
be/src/vec/CMakeLists.txt:
##########
@@ -152,6 +152,7 @@ set(VEC_FILES
   exprs/vcast_expr.cpp
   exprs/vcase_expr.cpp
   exprs/vinfo_func.cpp
+  exprs/vschema_change_expr.cpp
   exprs/table_function/table_function_factory.cpp
   exprs/table_function/vexplode.cpp
   exprs/table_function/vexplode_split.cpp

Review Comment:
   is it mistake to delete function_array_popfront and function_array_concat



##########
be/src/vec/core/block.cpp:
##########
@@ -699,10 +695,6 @@ Block Block::copy_block(const std::vector<int>& 
column_offset) const {
 }
 
 void Block::append_block_by_selector(MutableBlock* dst, const 
IColumn::Selector& selector) const {

Review Comment:
   column->is_exclusive() -> column->use_conut() == 1 ?



##########
be/src/vec/data_types/data_type_factory.cpp:
##########
@@ -380,6 +382,10 @@ DataTypePtr DataTypeFactory::create_data_type(const 
PColumnMeta& pcolumn) {
         nested = std::make_shared<DataTypeStruct>(dataTypes, names);
         break;
     }
+    case PGenericType::VARIANT: {
+        nested = std::make_shared<DataTypeObject>("object", true);
+        break;

Review Comment:
   PGenericType::MAP is modified



##########
be/src/vec/common/schema_util.h:
##########
@@ -43,35 +45,23 @@ DataTypePtr get_base_type_of_array(const DataTypePtr& type);
 /// Returns Array with requested number of dimensions and no scalars.
 Array create_empty_array_field(size_t num_dimensions);
 
-// NOTICE: the last column must be dynamic column
-// 1. The dynamic column will be parsed to ColumnObject and the parsed column 
will
-// be flattened to multiple subcolumns, thus the dynamic schema is infered 
from the
-// dynamic column.
-// 2. Schema change which is add columns will be performed if the infered 
schema is
-// different from the original tablet schema, new columns added to schema 
change history
-Status parse_and_expand_dynamic_column(Block& block, const TabletSchema& 
schema_hints,
-                                       LocalSchemaChangeRecorder* history);
-
-Status parse_object_column(Block& block, size_t position);
-
-Status parse_object_column(ColumnObject& dest, const IColumn& src, bool 
need_finalize,
-                           const int* row_begin, const int* row_end);
+// Cast column to dst type
+Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, 
ColumnPtr* result);
 
-// Object column will be flattened and if replace_if_duplicated
+// Object column will be unfolded and if replace_if_duplicated

Review Comment:
   the comment need to be changed to explain new arguments



##########
be/src/vec/columns/column_object.cpp:
##########
@@ -199,13 +199,9 @@ class FieldVisitorToScalarType : public 
StaticVisitor<size_t> {
         return 0;
     }
     size_t operator()(const Int64& x) {
-        // Only support Int32 and Int64
+        // Only Int64 at present

Review Comment:
   why delete support for Int32



##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -173,11 +149,6 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool* eo
             RETURN_IF_ERROR(
                     _cur_reader->get_next_block(_src_block_ptr, &read_rows, 
&_cur_reader_eof));
         }
-
-        if (_is_dynamic_schema) {
-            RETURN_IF_ERROR(_handle_dynamic_block(_src_block_ptr));

Review Comment:
   where is the equivalent logic now?



##########
fe/fe-core/src/main/java/org/apache/doris/load/Load.java:
##########
@@ -756,8 +758,10 @@ private static void initColumns(Table tbl, 
List<ImportColumnDesc> columnExprs,
                                     "stream load auto dynamic column");
             slotDesc.setType(Type.VARIANT);
             slotDesc.setColumn(col);
-            // alaways nullable
-            slotDesc.setIsNullable(true);
+            slotDesc.setIsNullable(false);
+            // Non-nullable slots will have 0 for the byte offset and -1 for 
the bit mask
+            slotDesc.setNullIndicatorBit(-1);
+            slotDesc.setNullIndicatorByte(0);
             slotDesc.setIsMaterialized(true);
             srcSlotIds.add(slotDesc.getId().asInt());
             slotDescByName.put(name, slotDesc);

Review Comment:
   this comment is for the changes far away bellow, since it can not be 
commented bellow.



##########
be/src/vec/exec/scan/vfile_scanner.h:
##########
@@ -114,8 +114,6 @@ class VFileScanner : public VScanner {
 

Review Comment:
   _src_block_mem_reuse ?



##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -412,14 +417,24 @@ Status NewJsonReader::_parse_dynamic_json(bool* 
is_empty_row, bool* eof,
     }
 
     _bytes_read_counter += size;
-
+    MutableColumnPtr& dynamic_column = columns.back();
+    auto& column_object = 
assert_cast<vectorized::ColumnObject&>(*(dynamic_column.get()));
+    Defer __finalize_clousure([&] {
+        // Reached buffer size, unfold intermediate column object
+        size_t batch_size = std::max(_state->batch_size(), 
(int)_MIN_BATCH_SIZE);
+        if (column_object.size() >= batch_size || _reader_eof) {
+            column_object.finalize();
+            // flatten object columns for the purpose of extracting static 
columns and
+            // fill default values missing in static columns
+            schema_util::unfold_object(columns.size() - 1, columns, 
_slot_desc_index, slot_descs,

Review Comment:
   there is no flatten_object before, why add unfold_object?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to