xiaokang commented on code in PR #27764:
URL: https://github.com/apache/doris/pull/27764#discussion_r1413560894


##########
gensrc/proto/internal_service.proto:
##########
@@ -802,6 +802,23 @@ message PGetWalQueueSizeResponse{
     optional int64 size = 2;
 }
 
+message PTabletsLocation {
+    required string host = 1;
+    required int32  brpc_port = 2;
+    repeated int64  tablet_id = 3;
+}
+
+message PFetchRemoteSchemaRequest {
+    repeated PTabletsLocation tablet_location = 1;

Review Comment:
   why repelated?



##########
fe/fe-core/src/main/cup/sql_parser.cup:
##########
@@ -4359,9 +4359,9 @@ opt_explain_options ::=
 
 // Describe statement
 describe_stmt ::=
-    describe_command table_name:table
+    describe_command table_name:table opt_partition_names:partitionNames

Review Comment:
   impl for new optimizer



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java:
##########
@@ -178,9 +192,21 @@ public void analyze(Analyzer analyzer) throws 
UserException {
                 if (table.getType() == TableType.OLAP) {
                     procString += ((OlapTable) table).getBaseIndexId();
                 } else {
+                    if (partitionNames != null) {
+                        throw new AnalysisException("Describe table[" + 
dbTableName.getTbl() + "] failed");

Review Comment:
   add detailed reason



##########
fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java:
##########
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Set;
+
+/*
+ * SHOW PROC /dbs/dbId/tableId/index_schema/indexId/partitionName"
+ * show index schema
+ */
+public class RemoteIndexSchemaProcNode implements ProcNodeInterface {
+    public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
+            .add("Field").add("Type").add("Null").add("Key")
+            .add("Default").add("Extra")
+            .build();
+
+    private List<Partition> partitions;
+    private List<Column> schema;
+    private Set<String> bfColumns;
+
+    public RemoteIndexSchemaProcNode(List<Partition> partitions, List<Column> 
schema, Set<String> bfColumns) {
+        this.partitions = partitions;
+        this.schema = schema;
+        this.bfColumns = bfColumns;
+    }
+
+    @Override
+    public ProcResult fetchResult() throws AnalysisException {

Review Comment:
   duplicate with RemoteIndexSchemaProcDir exception for partitions. Can you 
merge the them by argument?



##########
docs/zh-CN/docs/advanced/variables.md:
##########
@@ -689,6 +689,10 @@ try (Connection conn = 
DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/
   是否在对insert into语句启用部分列更新的语义,默认为 
false。需要注意的是,控制insert语句是否开启严格模式的会话变量`enable_insert_strict`的默认值为true,即insert语句默认开启严格模式,而在严格模式下进行部分列更新不允许更新不存在的key。所以,在使用insert语句进行部分列更新的时候如果希望能插入不存在的key,需要在`enable_unique_key_partial_update`设置为true的基础上同时将`enable_insert_strict`设置为false。
   </version>
 
+* `describe_extend_variant_column`
+
+  是否展示 variant 的拆解列。默认为 false。

Review Comment:
   拆解列 这个名词我们统一一下



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java:
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse;
+import org.apache.doris.proto.InternalService.PTabletsLocation;
+import org.apache.doris.proto.OlapFile.ColumnPB;
+import org.apache.doris.proto.OlapFile.TabletSchemaPB;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+// This class is used to pull the specified tablets' columns existing on the 
Backend (BE)
+// including regular columns and columns decomposed by variants
+public class FetchRemoteTabletSchemaUtil {
+    private static final Logger LOG = 
LogManager.getLogger(FetchRemoteTabletSchemaUtil.class);
+
+    private List<Tablet> remoteTablets;
+    private List<Column> tableColumns;
+
+    public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) {
+        this.remoteTablets = tablets;
+        this.tableColumns = Lists.newArrayList();
+    }
+
+    public List<Column> fetch() {
+        // 1. Find which Backend (BE) servers the tablets are on
+        Preconditions.checkNotNull(remoteTablets);
+        Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap();
+        for (Tablet tablet : remoteTablets) {
+            for (Replica replica : tablet.getReplicas()) {
+                // only need alive replica
+                if (replica.isAlive()) {
+                    Set<Long> tabletIds = beIdToTabletId.computeIfAbsent(
+                                    replica.getBackendId(), k -> 
Sets.newHashSet());
+                    tabletIds.add(tablet.getId());
+                }
+            }
+        }
+
+        // 2. Randomly select 2 Backend (BE) servers to act as coordinators.

Review Comment:
   why do it in be instead in fe directly?  Is redutency really necessary, 
since plain query is only processed once?



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java:
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse;
+import org.apache.doris.proto.InternalService.PTabletsLocation;
+import org.apache.doris.proto.OlapFile.ColumnPB;
+import org.apache.doris.proto.OlapFile.TabletSchemaPB;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+// This class is used to pull the specified tablets' columns existing on the 
Backend (BE)
+// including regular columns and columns decomposed by variants
+public class FetchRemoteTabletSchemaUtil {
+    private static final Logger LOG = 
LogManager.getLogger(FetchRemoteTabletSchemaUtil.class);
+
+    private List<Tablet> remoteTablets;
+    private List<Column> tableColumns;
+
+    public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) {
+        this.remoteTablets = tablets;
+        this.tableColumns = Lists.newArrayList();
+    }
+
+    public List<Column> fetch() {
+        // 1. Find which Backend (BE) servers the tablets are on
+        Preconditions.checkNotNull(remoteTablets);
+        Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap();
+        for (Tablet tablet : remoteTablets) {
+            for (Replica replica : tablet.getReplicas()) {
+                // only need alive replica
+                if (replica.isAlive()) {
+                    Set<Long> tabletIds = beIdToTabletId.computeIfAbsent(
+                                    replica.getBackendId(), k -> 
Sets.newHashSet());
+                    tabletIds.add(tablet.getId());
+                }
+            }
+        }
+
+        // 2. Randomly select 2 Backend (BE) servers to act as coordinators.
+        // Coordinator BE is responsible for collecting all table columns and 
returning to the FE.
+        // Two BE provide a retry opportunity with the second one in case the 
first attempt fails.
+        List<PTabletsLocation> locations = Lists.newArrayList();
+        List<Backend> coordinatorBackend = Lists.newArrayList();
+        for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) {
+            Long backendId = entry.getKey();
+            Set<Long> tabletIds = entry.getValue();
+            Backend backend = 
Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
+            // only need alive be
+            if (!backend.isAlive()) {
+                continue;
+            }
+            // need 2 be to provide a retry
+            if (coordinatorBackend.size() < 2) {
+                coordinatorBackend.add(backend);
+            }
+            PTabletsLocation.Builder locationBuilder = 
PTabletsLocation.newBuilder()
+                                                        
.setHost(backend.getHost())
+                                                        
.setBrpcPort(backend.getBrpcPort());
+            PTabletsLocation location = 
locationBuilder.addAllTabletId(tabletIds).build();
+            locations.add(location);
+        }
+        PFetchRemoteSchemaRequest.Builder requestBuilder = 
PFetchRemoteSchemaRequest.newBuilder()
+                                                                    
.addAllTabletLocation(locations)
+                                                                    
.setIsCoordinator(true);
+        // 3. Send rpc to coordinatorBackend util succeed or retry
+        for (Backend be : coordinatorBackend) {
+            try {
+                PFetchRemoteSchemaRequest request = requestBuilder.build();
+                Future<PFetchRemoteSchemaResponse> future = 
BackendServiceProxy.getInstance()
+                                            
.fetchRemoteTabletSchemaAsync(be.getBrpcAddress(), request);
+                PFetchRemoteSchemaResponse response = null;
+                try {
+                    response = future.get(60, TimeUnit.SECONDS);
+                    TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+                    String errMsg;
+                    if (code != TStatusCode.OK) {
+                        if 
(!response.getStatus().getErrorMsgsList().isEmpty()) {
+                            errMsg = 
response.getStatus().getErrorMsgsList().get(0);
+                        } else {
+                            errMsg = "fetchRemoteTabletSchemaAsync failed. 
backend address: "
+                                    + be.getHost() + " : " + be.getBrpcPort();
+                        }
+                        throw new RpcException(be.getHost(), errMsg);
+                    }
+                    fillColumns(response);
+                    return tableColumns;
+                } catch (AnalysisException e) {
+                    // continue to get result
+                    LOG.warn(e);
+                } catch (InterruptedException e) {
+                    // continue to get result
+                    LOG.warn("fetch remote schema future get interrupted 
Exception");
+                } catch (TimeoutException e) {
+                    future.cancel(true);
+                    // continue to get result
+                    LOG.warn("fetch remote schema result timeout, addr {}", 
be.getBrpcAddress());
+                }
+            }    catch (RpcException e) {
+                LOG.warn("fetch remote schema result rpc exception {}, e {}", 
be.getBrpcAddress(), e);
+            } catch (ExecutionException e) {
+                LOG.warn("fetch remote schema ExecutionException, addr {}, e 
{}", be.getBrpcAddress(), e);
+            }
+        }
+        return tableColumns;
+    }
+
+    private void fillColumns(PFetchRemoteSchemaResponse response) throws 
AnalysisException {
+        TabletSchemaPB schemaPB = response.getMergedSchema();
+        for (ColumnPB columnPB : schemaPB.getColumnList()) {
+            try {
+                Column remoteColumn = initColumnFromPB(columnPB);
+                tableColumns.add(remoteColumn);
+            } catch (Exception e) {
+                throw new AnalysisException("column default value to string 
failed");
+            }
+        }
+        int variantColumntIdx = 0;
+        for (Column column : tableColumns) {
+            variantColumntIdx++;
+            if (column.getType().isVariantType()) {
+                break;

Review Comment:
   How to handle multiple variant column?



##########
be/src/service/internal_service.cpp:
##########
@@ -852,6 +855,110 @@ void PInternalServiceImpl::_get_column_ids_by_tablet_ids(
     response->mutable_status()->set_status_code(TStatusCode::OK);
 }
 
+template <class RPCResponse>
+struct AsyncRPCContext {
+    RPCResponse response;
+    brpc::Controller cntl;
+    brpc::CallId cid;
+};
+
+void 
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController*
 controller,
+                                                      const 
PFetchRemoteSchemaRequest* request,
+                                                      
PFetchRemoteSchemaResponse* response,
+                                                      
google::protobuf::Closure* done) {
+    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        Status st = Status::OK();
+        if (request->is_coordinator()) {
+            // Spawn rpc request to none coordinator nodes, and finally merge 
them all
+            PFetchRemoteSchemaRequest remote_request(*request);
+            // set it none coordinator to get merged schema
+            remote_request.set_is_coordinator(false);
+            using PFetchRemoteTabletSchemaRpcContext = 
AsyncRPCContext<PFetchRemoteSchemaResponse>;
+            std::vector<PFetchRemoteTabletSchemaRpcContext> rpc_contexts(
+                    request->tablet_location_size());
+            for (int i = 0; i < request->tablet_location_size(); ++i) {
+                std::string host = request->tablet_location(i).host();
+                int32_t brpc_port = request->tablet_location(i).brpc_port();
+                std::shared_ptr<PBackendService_Stub> stub(
+                        
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                                host, brpc_port));
+                rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id();
+                stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, 
&remote_request,
+                                                 &rpc_contexts[i].response, 
brpc::DoNothing());
+            }
+            std::vector<TabletSchemaSPtr> schemas;
+            for (auto& rpc_context : rpc_contexts) {
+                brpc::Join(rpc_context.cid);
+                if (!st.ok()) {
+                    // make sure all flying rpc request is joined
+                    continue;
+                }
+                if (rpc_context.cntl.Failed()) {
+                    LOG(WARNING) << "fetch_remote_tablet_schema rpc err:"
+                                 << rpc_context.cntl.ErrorText();
+                    
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                            rpc_context.cntl.remote_side());
+                    st = Status::InternalError("fetch_remote_tablet_schema rpc 
err: {}",
+                                               rpc_context.cntl.ErrorText());
+                }
+                if (rpc_context.response.status().status_code() != 0) {
+                    st = Status::create(rpc_context.response.status());
+                }
+                if (rpc_context.response.has_merged_schema()) {
+                    TabletSchemaSPtr schema = std::make_shared<TabletSchema>();
+                    schema->init_from_pb(rpc_context.response.merged_schema());
+                    schemas.push_back(schema);
+                }
+            }
+            if (!schemas.empty() && st.ok()) {
+                // merge all
+                TabletSchemaSPtr merged_schema =
+                        
vectorized::schema_util::get_least_common_schema(schemas, nullptr);
+                VLOG_DEBUG << "dump schema:" << 
merged_schema->dump_structure();
+                merged_schema->to_schema_pb(response->mutable_merged_schema());
+            }
+            st.to_protobuf(response->mutable_status());
+            return;
+        }
+
+        // This is not a coordinator, get it's tablet and merge schema

Review Comment:
   encapsulate the following in else branch to be more clear.



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java:
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse;
+import org.apache.doris.proto.InternalService.PTabletsLocation;
+import org.apache.doris.proto.OlapFile.ColumnPB;
+import org.apache.doris.proto.OlapFile.TabletSchemaPB;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+// This class is used to pull the specified tablets' columns existing on the 
Backend (BE)
+// including regular columns and columns decomposed by variants
+public class FetchRemoteTabletSchemaUtil {
+    private static final Logger LOG = 
LogManager.getLogger(FetchRemoteTabletSchemaUtil.class);
+
+    private List<Tablet> remoteTablets;
+    private List<Column> tableColumns;
+
+    public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) {
+        this.remoteTablets = tablets;
+        this.tableColumns = Lists.newArrayList();
+    }
+
+    public List<Column> fetch() {
+        // 1. Find which Backend (BE) servers the tablets are on
+        Preconditions.checkNotNull(remoteTablets);
+        Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap();
+        for (Tablet tablet : remoteTablets) {
+            for (Replica replica : tablet.getReplicas()) {
+                // only need alive replica
+                if (replica.isAlive()) {
+                    Set<Long> tabletIds = beIdToTabletId.computeIfAbsent(
+                                    replica.getBackendId(), k -> 
Sets.newHashSet());
+                    tabletIds.add(tablet.getId());
+                }
+            }
+        }
+
+        // 2. Randomly select 2 Backend (BE) servers to act as coordinators.
+        // Coordinator BE is responsible for collecting all table columns and 
returning to the FE.
+        // Two BE provide a retry opportunity with the second one in case the 
first attempt fails.
+        List<PTabletsLocation> locations = Lists.newArrayList();
+        List<Backend> coordinatorBackend = Lists.newArrayList();
+        for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) {
+            Long backendId = entry.getKey();
+            Set<Long> tabletIds = entry.getValue();
+            Backend backend = 
Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
+            // only need alive be
+            if (!backend.isAlive()) {
+                continue;
+            }
+            // need 2 be to provide a retry
+            if (coordinatorBackend.size() < 2) {
+                coordinatorBackend.add(backend);
+            }
+            PTabletsLocation.Builder locationBuilder = 
PTabletsLocation.newBuilder()
+                                                        
.setHost(backend.getHost())
+                                                        
.setBrpcPort(backend.getBrpcPort());
+            PTabletsLocation location = 
locationBuilder.addAllTabletId(tabletIds).build();
+            locations.add(location);
+        }
+        PFetchRemoteSchemaRequest.Builder requestBuilder = 
PFetchRemoteSchemaRequest.newBuilder()
+                                                                    
.addAllTabletLocation(locations)
+                                                                    
.setIsCoordinator(true);
+        // 3. Send rpc to coordinatorBackend util succeed or retry
+        for (Backend be : coordinatorBackend) {
+            try {
+                PFetchRemoteSchemaRequest request = requestBuilder.build();
+                Future<PFetchRemoteSchemaResponse> future = 
BackendServiceProxy.getInstance()
+                                            
.fetchRemoteTabletSchemaAsync(be.getBrpcAddress(), request);
+                PFetchRemoteSchemaResponse response = null;
+                try {
+                    response = future.get(60, TimeUnit.SECONDS);
+                    TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+                    String errMsg;
+                    if (code != TStatusCode.OK) {
+                        if 
(!response.getStatus().getErrorMsgsList().isEmpty()) {
+                            errMsg = 
response.getStatus().getErrorMsgsList().get(0);
+                        } else {
+                            errMsg = "fetchRemoteTabletSchemaAsync failed. 
backend address: "
+                                    + be.getHost() + " : " + be.getBrpcPort();
+                        }
+                        throw new RpcException(be.getHost(), errMsg);
+                    }
+                    fillColumns(response);
+                    return tableColumns;
+                } catch (AnalysisException e) {
+                    // continue to get result
+                    LOG.warn(e);
+                } catch (InterruptedException e) {
+                    // continue to get result
+                    LOG.warn("fetch remote schema future get interrupted 
Exception");
+                } catch (TimeoutException e) {
+                    future.cancel(true);
+                    // continue to get result
+                    LOG.warn("fetch remote schema result timeout, addr {}", 
be.getBrpcAddress());
+                }
+            }    catch (RpcException e) {
+                LOG.warn("fetch remote schema result rpc exception {}, e {}", 
be.getBrpcAddress(), e);
+            } catch (ExecutionException e) {
+                LOG.warn("fetch remote schema ExecutionException, addr {}, e 
{}", be.getBrpcAddress(), e);
+            }
+        }
+        return tableColumns;
+    }
+
+    private void fillColumns(PFetchRemoteSchemaResponse response) throws 
AnalysisException {
+        TabletSchemaPB schemaPB = response.getMergedSchema();
+        for (ColumnPB columnPB : schemaPB.getColumnList()) {
+            try {
+                Column remoteColumn = initColumnFromPB(columnPB);
+                tableColumns.add(remoteColumn);
+            } catch (Exception e) {
+                throw new AnalysisException("column default value to string 
failed");
+            }
+        }
+        int variantColumntIdx = 0;
+        for (Column column : tableColumns) {
+            variantColumntIdx++;
+            if (column.getType().isVariantType()) {
+                break;
+            }
+        }
+        if (variantColumntIdx == tableColumns.size()) {
+            return;
+        }
+        List<Column> subList = tableColumns.subList(variantColumntIdx, 
tableColumns.size());
+        Collections.sort(subList, new Comparator<Column>() {
+            @Override
+            public int compare(Column c1, Column c2) {
+                return c1.getName().compareTo(c2.getName());
+            }
+        });
+    }
+
+    private Column initColumnFromPB(ColumnPB column) throws AnalysisException {
+        try {
+            AggregateType aggType = 
getAggTypeFromAggName(column.getAggregation());

Review Comment:
   Normal columns are processed here in a seperate way. Can we just process 
variant column specially and let other columns processed by normal DESC logic?



##########
be/src/service/internal_service.cpp:
##########
@@ -852,6 +855,110 @@ void PInternalServiceImpl::_get_column_ids_by_tablet_ids(
     response->mutable_status()->set_status_code(TStatusCode::OK);
 }
 
+template <class RPCResponse>
+struct AsyncRPCContext {
+    RPCResponse response;
+    brpc::Controller cntl;
+    brpc::CallId cid;
+};
+
+void 
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController*
 controller,
+                                                      const 
PFetchRemoteSchemaRequest* request,
+                                                      
PFetchRemoteSchemaResponse* response,
+                                                      
google::protobuf::Closure* done) {
+    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        Status st = Status::OK();
+        if (request->is_coordinator()) {
+            // Spawn rpc request to none coordinator nodes, and finally merge 
them all
+            PFetchRemoteSchemaRequest remote_request(*request);
+            // set it none coordinator to get merged schema
+            remote_request.set_is_coordinator(false);
+            using PFetchRemoteTabletSchemaRpcContext = 
AsyncRPCContext<PFetchRemoteSchemaResponse>;
+            std::vector<PFetchRemoteTabletSchemaRpcContext> rpc_contexts(
+                    request->tablet_location_size());
+            for (int i = 0; i < request->tablet_location_size(); ++i) {
+                std::string host = request->tablet_location(i).host();
+                int32_t brpc_port = request->tablet_location(i).brpc_port();
+                std::shared_ptr<PBackendService_Stub> stub(
+                        
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                                host, brpc_port));
+                rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id();
+                stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, 
&remote_request,
+                                                 &rpc_contexts[i].response, 
brpc::DoNothing());
+            }
+            std::vector<TabletSchemaSPtr> schemas;
+            for (auto& rpc_context : rpc_contexts) {
+                brpc::Join(rpc_context.cid);
+                if (!st.ok()) {
+                    // make sure all flying rpc request is joined
+                    continue;
+                }
+                if (rpc_context.cntl.Failed()) {
+                    LOG(WARNING) << "fetch_remote_tablet_schema rpc err:"
+                                 << rpc_context.cntl.ErrorText();
+                    
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                            rpc_context.cntl.remote_side());
+                    st = Status::InternalError("fetch_remote_tablet_schema rpc 
err: {}",
+                                               rpc_context.cntl.ErrorText());
+                }
+                if (rpc_context.response.status().status_code() != 0) {
+                    st = Status::create(rpc_context.response.status());
+                }
+                if (rpc_context.response.has_merged_schema()) {
+                    TabletSchemaSPtr schema = std::make_shared<TabletSchema>();
+                    schema->init_from_pb(rpc_context.response.merged_schema());
+                    schemas.push_back(schema);
+                }
+            }
+            if (!schemas.empty() && st.ok()) {
+                // merge all
+                TabletSchemaSPtr merged_schema =
+                        
vectorized::schema_util::get_least_common_schema(schemas, nullptr);
+                VLOG_DEBUG << "dump schema:" << 
merged_schema->dump_structure();
+                merged_schema->to_schema_pb(response->mutable_merged_schema());
+            }
+            st.to_protobuf(response->mutable_status());
+            return;
+        }
+
+        // This is not a coordinator, get it's tablet and merge schema
+        std::vector<int64_t> target_tablets;
+        for (int i = 0; i < request->tablet_location_size(); ++i) {
+            const auto& location = request->tablet_location(i);
+            auto backend = BackendOptions::get_local_backend();
+            // If this is the target backend
+            if (backend.host == location.host() && config::brpc_port == 
location.brpc_port()) {
+                target_tablets.assign(location.tablet_id().begin(), 
location.tablet_id().end());
+                break;
+            }
+        }
+        if (!target_tablets.empty()) {
+            std::vector<TabletSchemaSPtr> tablet_schemas;
+            for (int64_t tablet_id : target_tablets) {
+                TabletSharedPtr tablet =
+                        
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false);
+                if (tablet == nullptr) {
+                    // just ignore

Review Comment:
   at least add a WARNING log



##########
be/src/olap/rowset_builder.cpp:
##########
@@ -362,7 +362,19 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t 
index_id,
                                                     indexes[i], 
ori_tablet_schema);
     }
     if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) 
{
-        _tablet->update_max_version_schema(_tablet_schema);
+        // After schema change, should include extracted column

Review Comment:
   Is this code related to the PR?



##########
docs/zh-CN/docs/advanced/variables.md:
##########
@@ -689,6 +689,10 @@ try (Connection conn = 
DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/
   是否在对insert into语句启用部分列更新的语义,默认为 
false。需要注意的是,控制insert语句是否开启严格模式的会话变量`enable_insert_strict`的默认值为true,即insert语句默认开启严格模式,而在严格模式下进行部分列更新不允许更新不存在的key。所以,在使用insert语句进行部分列更新的时候如果希望能插入不存在的key,需要在`enable_unique_key_partial_update`设置为true的基础上同时将`enable_insert_strict`设置为false。
   </version>
 
+* `describe_extend_variant_column`
+
+  是否展示 variant 的拆解列。默认为 false。

Review Comment:
   拆解列 这个名词我们统一一下



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java:
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest;
+import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse;
+import org.apache.doris.proto.InternalService.PTabletsLocation;
+import org.apache.doris.proto.OlapFile.ColumnPB;
+import org.apache.doris.proto.OlapFile.TabletSchemaPB;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+// This class is used to pull the specified tablets' columns existing on the 
Backend (BE)
+// including regular columns and columns decomposed by variants
+public class FetchRemoteTabletSchemaUtil {
+    private static final Logger LOG = 
LogManager.getLogger(FetchRemoteTabletSchemaUtil.class);
+
+    private List<Tablet> remoteTablets;
+    private List<Column> tableColumns;
+
+    public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) {
+        this.remoteTablets = tablets;
+        this.tableColumns = Lists.newArrayList();
+    }
+
+    public List<Column> fetch() {
+        // 1. Find which Backend (BE) servers the tablets are on
+        Preconditions.checkNotNull(remoteTablets);
+        Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap();
+        for (Tablet tablet : remoteTablets) {
+            for (Replica replica : tablet.getReplicas()) {
+                // only need alive replica
+                if (replica.isAlive()) {
+                    Set<Long> tabletIds = beIdToTabletId.computeIfAbsent(
+                                    replica.getBackendId(), k -> 
Sets.newHashSet());
+                    tabletIds.add(tablet.getId());
+                }
+            }
+        }
+
+        // 2. Randomly select 2 Backend (BE) servers to act as coordinators.
+        // Coordinator BE is responsible for collecting all table columns and 
returning to the FE.
+        // Two BE provide a retry opportunity with the second one in case the 
first attempt fails.
+        List<PTabletsLocation> locations = Lists.newArrayList();
+        List<Backend> coordinatorBackend = Lists.newArrayList();
+        for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) {
+            Long backendId = entry.getKey();
+            Set<Long> tabletIds = entry.getValue();
+            Backend backend = 
Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
+            // only need alive be
+            if (!backend.isAlive()) {
+                continue;
+            }
+            // need 2 be to provide a retry
+            if (coordinatorBackend.size() < 2) {
+                coordinatorBackend.add(backend);
+            }
+            PTabletsLocation.Builder locationBuilder = 
PTabletsLocation.newBuilder()
+                                                        
.setHost(backend.getHost())
+                                                        
.setBrpcPort(backend.getBrpcPort());
+            PTabletsLocation location = 
locationBuilder.addAllTabletId(tabletIds).build();
+            locations.add(location);
+        }
+        PFetchRemoteSchemaRequest.Builder requestBuilder = 
PFetchRemoteSchemaRequest.newBuilder()
+                                                                    
.addAllTabletLocation(locations)
+                                                                    
.setIsCoordinator(true);
+        // 3. Send rpc to coordinatorBackend util succeed or retry
+        for (Backend be : coordinatorBackend) {
+            try {
+                PFetchRemoteSchemaRequest request = requestBuilder.build();
+                Future<PFetchRemoteSchemaResponse> future = 
BackendServiceProxy.getInstance()
+                                            
.fetchRemoteTabletSchemaAsync(be.getBrpcAddress(), request);
+                PFetchRemoteSchemaResponse response = null;
+                try {
+                    response = future.get(60, TimeUnit.SECONDS);
+                    TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+                    String errMsg;
+                    if (code != TStatusCode.OK) {
+                        if 
(!response.getStatus().getErrorMsgsList().isEmpty()) {
+                            errMsg = 
response.getStatus().getErrorMsgsList().get(0);
+                        } else {
+                            errMsg = "fetchRemoteTabletSchemaAsync failed. 
backend address: "
+                                    + be.getHost() + " : " + be.getBrpcPort();
+                        }
+                        throw new RpcException(be.getHost(), errMsg);
+                    }
+                    fillColumns(response);
+                    return tableColumns;
+                } catch (AnalysisException e) {
+                    // continue to get result
+                    LOG.warn(e);
+                } catch (InterruptedException e) {
+                    // continue to get result
+                    LOG.warn("fetch remote schema future get interrupted 
Exception");
+                } catch (TimeoutException e) {
+                    future.cancel(true);
+                    // continue to get result
+                    LOG.warn("fetch remote schema result timeout, addr {}", 
be.getBrpcAddress());
+                }
+            }    catch (RpcException e) {
+                LOG.warn("fetch remote schema result rpc exception {}, e {}", 
be.getBrpcAddress(), e);
+            } catch (ExecutionException e) {
+                LOG.warn("fetch remote schema ExecutionException, addr {}, e 
{}", be.getBrpcAddress(), e);
+            }
+        }
+        return tableColumns;
+    }
+
+    private void fillColumns(PFetchRemoteSchemaResponse response) throws 
AnalysisException {
+        TabletSchemaPB schemaPB = response.getMergedSchema();
+        for (ColumnPB columnPB : schemaPB.getColumnList()) {
+            try {
+                Column remoteColumn = initColumnFromPB(columnPB);
+                tableColumns.add(remoteColumn);
+            } catch (Exception e) {
+                throw new AnalysisException("column default value to string 
failed");
+            }
+        }
+        int variantColumntIdx = 0;
+        for (Column column : tableColumns) {
+            variantColumntIdx++;
+            if (column.getType().isVariantType()) {
+                break;
+            }
+        }
+        if (variantColumntIdx == tableColumns.size()) {
+            return;
+        }
+        List<Column> subList = tableColumns.subList(variantColumntIdx, 
tableColumns.size());
+        Collections.sort(subList, new Comparator<Column>() {
+            @Override
+            public int compare(Column c1, Column c2) {
+                return c1.getName().compareTo(c2.getName());
+            }
+        });
+    }
+
+    private Column initColumnFromPB(ColumnPB column) throws AnalysisException {
+        try {
+            AggregateType aggType = 
getAggTypeFromAggName(column.getAggregation());
+            Type type = getTypeFromTypeName(column.getType());
+            String columnName = column.getName();
+            boolean isKey = column.getIsKey();
+            boolean isNullable = column.getIsNullable();
+            String defaultValue = column.getDefaultValue().toString("UTF-8");
+            if (defaultValue.equals("")) {
+                defaultValue = null;
+            }
+            if (isKey) {
+                aggType = null;
+            }
+            do {
+                if (type.isArrayType()) {
+                    List<ColumnPB> childColumn = 
column.getChildrenColumnsList();
+                    if (childColumn == null || childColumn.size() != 1) {
+                        break;
+                    }
+                    Column child = initColumnFromPB(childColumn.get(0));
+                    type = new ArrayType(child.getType());
+                } else if (type.isMapType()) {
+                    List<ColumnPB> childColumn = 
column.getChildrenColumnsList();
+                    if (childColumn == null || childColumn.size() != 2) {
+                        break;
+                    }
+                    Column keyChild = initColumnFromPB(childColumn.get(0));
+                    Column valueChild = initColumnFromPB(childColumn.get(1));
+                    type = new MapType(keyChild.getType(), 
valueChild.getType());
+                } else if (type.isStructType()) {
+                    List<ColumnPB> childColumn = 
column.getChildrenColumnsList();
+                    if (childColumn == null) {
+                        break;
+                    }
+                    List<Type> childTypes = Lists.newArrayList();
+                    for (ColumnPB childPB : childColumn) {
+                        childTypes.add(initColumnFromPB(childPB).getType());
+                    }
+                    type = new StructType(childTypes);
+                }
+            } while (false);
+            return new Column(columnName, type, isKey, aggType, isNullable,
+                                                    defaultValue, "remote 
schema");
+        } catch (Exception e) {
+            throw new AnalysisException("default value to string failed");
+        }
+    }
+
+    private Type getTypeFromTypeName(String typeName) {

Review Comment:
   if else is not good for new types, try to find a better way to get Type from 
Name.



##########
be/src/olap/tablet_schema.cpp:
##########
@@ -1019,6 +1019,23 @@ bool TabletSchema::is_dropped_column(const TabletColumn& 
col) const {
            column(col.name()).unique_id() != col.unique_id();
 }
 
+void TabletSchema::copy_extracted_columns(const TabletSchema& src_schema) {
+    std::unordered_set<int32_t> variant_columns;
+    for (const auto& col : columns()) {
+        if (col.is_variant_type()) {
+            variant_columns.insert(col.unique_id());
+        }
+    }
+    for (const TabletColumn& col : src_schema.columns()) {
+        if (col.is_extracted_column() && 
variant_columns.contains(col.parent_unique_id())) {
+            ColumnPB col_pb;
+            col.to_schema_pb(&col_pb);
+            TabletColumn new_col(col_pb);
+            append_column(new_col, ColumnType::VARIANT);

Review Comment:
   If new_col is already in TabletSchema, will it cause duplicated column?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to