xiaokang commented on code in PR #16335: URL: https://github.com/apache/doris/pull/16335#discussion_r1097022421
########## fe/fe-core/src/main/cup/sql_parser.cup: ########## @@ -1815,7 +1828,20 @@ create_stmt ::= opt_ext_properties:extProperties {: RESULT = new CreateTableStmt(ifNotExists, isExternal, name, columns, indexes, engineName, keys, partition, - distribution, tblProperties, extProperties, tableComment, index); + distribution, tblProperties, extProperties, tableComment, index, false); + :} + | KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name + LPAREN column_definition_list:columns COMMA index_definition_list:indexes COMMA DOTDOTDOT RPAREN opt_engine:engineName Review Comment: can we move ... just after column_definition_list to keep consistent ########## fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java: ########## @@ -271,8 +273,10 @@ public void analyze(boolean isOlap) throws AnalysisException { throw new AnalysisException("Array can only be used in the non-key column of" + " the duplicate table at present."); } - if (defaultValue.isSet && defaultValue != DefaultValue.NULL_DEFAULT_VALUE) { - throw new AnalysisException("Array type column default value only support null"); + if (defaultValue.isSet && defaultValue != DefaultValue.NULL_DEFAULT_VALUE Review Comment: @cambyzju can you check the logic for default value? ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -214,6 +228,161 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException { return result; } + private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment) { + TypeDef typeDef = TypeDef.createTypeDef(tColumnDesc); + boolean isAllowNull = tColumnDesc.isIsAllowNull(); + ColumnDef.DefaultValue defaultVal = ColumnDef.DefaultValue.NOT_SET; + // Dynamic table's Array default value should be '[]' + if (typeDef.getType().isArrayType()) { + defaultVal = ColumnDef.DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE; + } + return new ColumnDef(tColumnDesc.getColumnName(), typeDef, false, null, isAllowNull, defaultVal, + comment, true); + } + + @Override + public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request); + + TStatus status = new TStatus(TStatusCode.OK); + List<TColumn> allColumns = new ArrayList<TColumn>(); + + Env env = Env.getCurrentEnv(); + InternalCatalog catalog = env.getInternalCatalog(); + int schemaVersion = 0; + try { + if (!env.isMaster()) { + status.setStatusCode(TStatusCode.ILLEGAL_STATE); + status.addToErrorMsgs("retry rpc request to master."); Review Comment: why not forward to master fe? ########## fe/fe-core/src/main/java/org/apache/doris/load/Load.java: ########## @@ -720,20 +720,46 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs, slotDesc.setIsNullable(tblColumn.isAllowNull()); } } else { - // columns default be varchar type - slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); - slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); - // ISSUE A: src slot should be nullable even if the column is not nullable. - // because src slot is what we read from file, not represent to real column value. - // If column is not nullable, error will be thrown when filling the dest slot, - // which is not nullable. - slotDesc.setIsNullable(true); + if (formatType == TFileFormatType.FORMAT_JSON + && tbl instanceof OlapTable && ((OlapTable) tbl).isDynamicSchema()) { + slotDesc.setType(tblColumn.getType()); + slotDesc.setColumn(new Column(realColName, tblColumn.getType())); + slotDesc.setIsNullable(tblColumn.isAllowNull()); + } else { + // columns default be varchar type + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); + // ISSUE A: src slot should be nullable even if the column is not nullable. + // because src slot is what we read from file, not represent to real column value. + // If column is not nullable, error will be thrown when filling the dest slot, + // which is not nullable. + slotDesc.setIsNullable(true); + } } slotDesc.setIsMaterialized(true); srcSlotIds.add(slotDesc.getId().asInt()); slotDescByName.put(realColName, slotDesc); } } + + // add a implict container column "__dynamic__" for dynamic columns Review Comment: __DORIS_DYNAMIC_COL__ ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -214,6 +228,161 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException { return result; } + private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment) { + TypeDef typeDef = TypeDef.createTypeDef(tColumnDesc); + boolean isAllowNull = tColumnDesc.isIsAllowNull(); + ColumnDef.DefaultValue defaultVal = ColumnDef.DefaultValue.NOT_SET; + // Dynamic table's Array default value should be '[]' + if (typeDef.getType().isArrayType()) { + defaultVal = ColumnDef.DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE; + } + return new ColumnDef(tColumnDesc.getColumnName(), typeDef, false, null, isAllowNull, defaultVal, + comment, true); + } + + @Override + public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request); + + TStatus status = new TStatus(TStatusCode.OK); + List<TColumn> allColumns = new ArrayList<TColumn>(); + + Env env = Env.getCurrentEnv(); + InternalCatalog catalog = env.getInternalCatalog(); + int schemaVersion = 0; + try { + if (!env.isMaster()) { + status.setStatusCode(TStatusCode.ILLEGAL_STATE); + status.addToErrorMsgs("retry rpc request to master."); + TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns, 0); + LOG.debug("result: {}", result); + return result; + } + TableName tableName = new TableName("", request.getDbName(), request.getTableName()); + if (request.getTableId() > 0) { + tableName = catalog.getTableNameByTableId(request.getTableId()); + } + if (tableName == null) { + throw new MetaNotFoundException("table_id " + request.getTableId() + " does not exist"); + } + + Database db = catalog.getDbNullable(tableName.getDb()); + if (db == null) { + throw new MetaNotFoundException("db " + tableName.getDb() + " does not exist"); + } + + List<TColumnDef> addColumns = request.getAddColumns(); + boolean queryMode = false; + if (addColumns == null || addColumns.size() == 0) { + queryMode = true; + } + + // rpc only olap table + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName.getTbl(), TableType.OLAP); + olapTable.writeLockOrMetaException(); + + try { + List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); + + // prepare columnDefs + for (TColumnDef tColumnDef : addColumns) { + if (request.isTypeConflictFree()) { Review Comment: The name 'isTypeConflictFree' is not so intuitive. 'allowTypeConflict' may be better. ########## fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java: ########## @@ -70,6 +70,8 @@ public class TupleDescriptor { private float avgSerializedSize; // in bytes; includes serialization overhead + private int tableId = -1; Review Comment: what's purpose to assing tableId in TupleDescriptor ########## fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java: ########## @@ -169,6 +170,23 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { } } + if (destTable.isDynamicSchema()) { Review Comment: add comment to explain difference with Load.initColumns ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java: ########## @@ -143,6 +144,21 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis } } + if (table.isDynamicSchema()) { Review Comment: add comment to explain the different to Load.initColumns() ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -214,6 +228,161 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException { return result; } + private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment) { + TypeDef typeDef = TypeDef.createTypeDef(tColumnDesc); + boolean isAllowNull = tColumnDesc.isIsAllowNull(); + ColumnDef.DefaultValue defaultVal = ColumnDef.DefaultValue.NOT_SET; + // Dynamic table's Array default value should be '[]' + if (typeDef.getType().isArrayType()) { + defaultVal = ColumnDef.DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE; + } + return new ColumnDef(tColumnDesc.getColumnName(), typeDef, false, null, isAllowNull, defaultVal, + comment, true); + } + + @Override + public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request); + + TStatus status = new TStatus(TStatusCode.OK); + List<TColumn> allColumns = new ArrayList<TColumn>(); + + Env env = Env.getCurrentEnv(); + InternalCatalog catalog = env.getInternalCatalog(); + int schemaVersion = 0; + try { + if (!env.isMaster()) { + status.setStatusCode(TStatusCode.ILLEGAL_STATE); + status.addToErrorMsgs("retry rpc request to master."); + TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns, 0); + LOG.debug("result: {}", result); + return result; + } + TableName tableName = new TableName("", request.getDbName(), request.getTableName()); + if (request.getTableId() > 0) { + tableName = catalog.getTableNameByTableId(request.getTableId()); + } + if (tableName == null) { + throw new MetaNotFoundException("table_id " + request.getTableId() + " does not exist"); + } + + Database db = catalog.getDbNullable(tableName.getDb()); + if (db == null) { + throw new MetaNotFoundException("db " + tableName.getDb() + " does not exist"); + } + + List<TColumnDef> addColumns = request.getAddColumns(); + boolean queryMode = false; + if (addColumns == null || addColumns.size() == 0) { + queryMode = true; + } + + // rpc only olap table + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName.getTbl(), TableType.OLAP); + olapTable.writeLockOrMetaException(); + + try { + List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); + + // prepare columnDefs + for (TColumnDef tColumnDef : addColumns) { + if (request.isTypeConflictFree()) { + // ignore column with same name + boolean hasSameNameColumn = false; + for (Column column : olapTable.getBaseSchema()) { + if (column.getName().equals(tColumnDef.getColumnDesc().getColumnName())) { + hasSameNameColumn = true; + } + } + // ignore this column + if (hasSameNameColumn) { + continue; Review Comment: How to process write if the type is not the same as the existed column? ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -214,6 +228,161 @@ public TGetDbsResult getDbNames(TGetDbsParams params) throws TException { return result; } + private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment) { + TypeDef typeDef = TypeDef.createTypeDef(tColumnDesc); + boolean isAllowNull = tColumnDesc.isIsAllowNull(); + ColumnDef.DefaultValue defaultVal = ColumnDef.DefaultValue.NOT_SET; + // Dynamic table's Array default value should be '[]' + if (typeDef.getType().isArrayType()) { + defaultVal = ColumnDef.DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE; + } + return new ColumnDef(tColumnDesc.getColumnName(), typeDef, false, null, isAllowNull, defaultVal, + comment, true); + } + + @Override + public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("schema change clientAddr: {}, request: {}", clientAddr, request); + + TStatus status = new TStatus(TStatusCode.OK); + List<TColumn> allColumns = new ArrayList<TColumn>(); + + Env env = Env.getCurrentEnv(); + InternalCatalog catalog = env.getInternalCatalog(); + int schemaVersion = 0; + try { + if (!env.isMaster()) { + status.setStatusCode(TStatusCode.ILLEGAL_STATE); + status.addToErrorMsgs("retry rpc request to master."); + TAddColumnsResult result = new TAddColumnsResult(status, request.getTableId(), allColumns, 0); + LOG.debug("result: {}", result); + return result; + } + TableName tableName = new TableName("", request.getDbName(), request.getTableName()); + if (request.getTableId() > 0) { + tableName = catalog.getTableNameByTableId(request.getTableId()); + } + if (tableName == null) { + throw new MetaNotFoundException("table_id " + request.getTableId() + " does not exist"); + } + + Database db = catalog.getDbNullable(tableName.getDb()); + if (db == null) { + throw new MetaNotFoundException("db " + tableName.getDb() + " does not exist"); + } + + List<TColumnDef> addColumns = request.getAddColumns(); + boolean queryMode = false; + if (addColumns == null || addColumns.size() == 0) { + queryMode = true; + } + + // rpc only olap table + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName.getTbl(), TableType.OLAP); + olapTable.writeLockOrMetaException(); + + try { + List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); + + // prepare columnDefs + for (TColumnDef tColumnDef : addColumns) { + if (request.isTypeConflictFree()) { + // ignore column with same name + boolean hasSameNameColumn = false; + for (Column column : olapTable.getBaseSchema()) { + if (column.getName().equals(tColumnDef.getColumnDesc().getColumnName())) { + hasSameNameColumn = true; + } + } + // ignore this column + if (hasSameNameColumn) { + continue; + } + } + String comment = tColumnDef.getComment(); + if (comment == null || comment.length() == 0) { + Instant ins = Instant.ofEpochSecond(System.currentTimeMillis() / 1000); + ZonedDateTime zdt = ins.atZone(ZoneId.systemDefault()); + comment = "auto change " + zdt.toString(); + } + + TColumnDesc tColumnDesc = tColumnDef.getColumnDesc(); + ColumnDef columnDef = initColumnfromThrift(tColumnDesc, comment); + columnDefs.add(columnDef); + } + + if (!queryMode && !columnDefs.isEmpty()) { + //3.create AddColumnsClause Review Comment: there is no comment for step 1,2. ########## gensrc/thrift/Types.thrift: ########## @@ -92,13 +92,16 @@ enum TPrimitiveType { DECIMAL128I, JSONB, UNSUPPORTED + DECIMAL128, Review Comment: why add DECIMAL128 here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org