eldenmoon commented on code in PR #27764: URL: https://github.com/apache/doris/pull/27764#discussion_r1410111406
########## regression-test/suites/variant_p0/desc.groovy: ########## @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("regression_test_variant_desc", "variant_type_desc"){ + Review Comment: rename `variant_type_desc` as `nonConcurrent` ########## fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse; +import org.apache.doris.proto.InternalService.PTabletsLocation; +import org.apache.doris.proto.OlapFile.ColumnPB; +import org.apache.doris.proto.OlapFile.TabletSchemaPB; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class FetchRemoteTabletSchemaUtil { + private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class); + + private List<Tablet> tablets; + private List<Column> columns; + + public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) { + this.tablets = tablets; + this.columns = Lists.newArrayList(); + } + + public List<Column> fetch() { + // find be + Preconditions.checkNotNull(tablets); + Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap(); + for (Tablet tablet : tablets) { + for (Replica replica : tablet.getReplicas()) { + Set<Long> tabletIds = beIdToTabletId.computeIfAbsent( + replica.getBackendId(), k -> Sets.newHashSet()); + tabletIds.add(tablet.getId()); + } + } + + // build PTabletsLocation + List<PTabletsLocation> locations = Lists.newArrayList(); + List<Backend> coordinatorBackend = Lists.newArrayList(); + for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) { + Long backendId = entry.getKey(); + Set<Long> tabletIds = entry.getValue(); + Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId); + if (coordinatorBackend.size() < 2) { + coordinatorBackend.add(backend); + } + PTabletsLocation.Builder locationBuilder = PTabletsLocation.newBuilder() + .setHost(backend.getHost()) + .setBrpcPort(backend.getBrpcPort()); + PTabletsLocation location = locationBuilder.addAllTabletId(tabletIds).build(); + locations.add(location); + } + PFetchRemoteSchemaRequest.Builder requestBuilder = PFetchRemoteSchemaRequest.newBuilder() + .addAllTabletLocation(locations) + .setIsCoordinator(true); + // send rpc to coordinatorBackend util succeed or 2 times + for (Backend be : coordinatorBackend) { + try { + PFetchRemoteSchemaRequest request = requestBuilder.build(); + Future<PFetchRemoteSchemaResponse> future = BackendServiceProxy.getInstance() + .fetchRemoteTabletSchemaAsync(be.getBrpcAddress(), request); + PFetchRemoteSchemaResponse response = null; + try { + response = future.get(60, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + String errMsg; + if (code != TStatusCode.OK) { + if (!response.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = response.getStatus().getErrorMsgsList().get(0); + } else { + errMsg = "fetchRemoteTabletSchemaAsync failed. backend address: " + + be.getHost() + " : " + be.getBrpcPort(); + } + throw new AnalysisException(errMsg); + } + fillColumns(response); + break; + } catch (AnalysisException e) { + // continue to get result + LOG.warn(e); Review Comment: why continue in `AnalysisException`? ########## fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse; +import org.apache.doris.proto.InternalService.PTabletsLocation; +import org.apache.doris.proto.OlapFile.ColumnPB; +import org.apache.doris.proto.OlapFile.TabletSchemaPB; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class FetchRemoteTabletSchemaUtil { + private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class); + + private List<Tablet> tablets; + private List<Column> columns; + + public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) { + this.tablets = tablets; + this.columns = Lists.newArrayList(); + } + + public List<Column> fetch() { + // find be + Preconditions.checkNotNull(tablets); + Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap(); + for (Tablet tablet : tablets) { + for (Replica replica : tablet.getReplicas()) { + Set<Long> tabletIds = beIdToTabletId.computeIfAbsent( Review Comment: find alive replica ########## fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse; +import org.apache.doris.proto.InternalService.PTabletsLocation; +import org.apache.doris.proto.OlapFile.ColumnPB; +import org.apache.doris.proto.OlapFile.TabletSchemaPB; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class FetchRemoteTabletSchemaUtil { + private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class); + + private List<Tablet> tablets; + private List<Column> columns; + + public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) { + this.tablets = tablets; + this.columns = Lists.newArrayList(); + } + + public List<Column> fetch() { + // find be + Preconditions.checkNotNull(tablets); + Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap(); + for (Tablet tablet : tablets) { + for (Replica replica : tablet.getReplicas()) { + Set<Long> tabletIds = beIdToTabletId.computeIfAbsent( + replica.getBackendId(), k -> Sets.newHashSet()); + tabletIds.add(tablet.getId()); + } + } + + // build PTabletsLocation + List<PTabletsLocation> locations = Lists.newArrayList(); + List<Backend> coordinatorBackend = Lists.newArrayList(); + for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) { + Long backendId = entry.getKey(); + Set<Long> tabletIds = entry.getValue(); + Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId); + if (coordinatorBackend.size() < 2) { + coordinatorBackend.add(backend); + } + PTabletsLocation.Builder locationBuilder = PTabletsLocation.newBuilder() + .setHost(backend.getHost()) + .setBrpcPort(backend.getBrpcPort()); + PTabletsLocation location = locationBuilder.addAllTabletId(tabletIds).build(); + locations.add(location); + } + PFetchRemoteSchemaRequest.Builder requestBuilder = PFetchRemoteSchemaRequest.newBuilder() + .addAllTabletLocation(locations) + .setIsCoordinator(true); + // send rpc to coordinatorBackend util succeed or 2 times + for (Backend be : coordinatorBackend) { + try { Review Comment: need backoff retry ########## regression-test/suites/variant_p0/desc.groovy: ########## @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("regression_test_variant_desc", "variant_type_desc"){ + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def create_table = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + } + + def create_table_partition = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + PARTITION BY RANGE(k) + ( + PARTITION p1 VALUES LESS THAN (3000), + PARTITION p2 VALUES LESS THAN (50000), + PARTITION p3 VALUES LESS THAN (100000) + ) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + } + + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + try { + // sparse columns + def table_name = "sparse_columns" + create_table table_name + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + sql """set describe_extend_variant_column = true""" + sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str Review Comment: add chinese column name and unicode column, to test if could be output well ########## regression-test/suites/variant_p0/desc.groovy: ########## @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("regression_test_variant_desc", "variant_type_desc"){ + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def create_table = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( Review Comment: add multi variant cases.could add extra variant columns by schema change `add column` ########## fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse; +import org.apache.doris.proto.InternalService.PTabletsLocation; +import org.apache.doris.proto.OlapFile.ColumnPB; +import org.apache.doris.proto.OlapFile.TabletSchemaPB; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class FetchRemoteTabletSchemaUtil { + private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class); + + private List<Tablet> tablets; + private List<Column> columns; + + public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) { + this.tablets = tablets; + this.columns = Lists.newArrayList(); + } + + public List<Column> fetch() { + // find be + Preconditions.checkNotNull(tablets); + Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap(); + for (Tablet tablet : tablets) { + for (Replica replica : tablet.getReplicas()) { + Set<Long> tabletIds = beIdToTabletId.computeIfAbsent( + replica.getBackendId(), k -> Sets.newHashSet()); + tabletIds.add(tablet.getId()); + } + } + + // build PTabletsLocation + List<PTabletsLocation> locations = Lists.newArrayList(); + List<Backend> coordinatorBackend = Lists.newArrayList(); + for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) { + Long backendId = entry.getKey(); + Set<Long> tabletIds = entry.getValue(); + Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId); + if (coordinatorBackend.size() < 2) { Review Comment: `< 2` is meanling leass, make it more clear.Or add comment ########## fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse; +import org.apache.doris.proto.InternalService.PTabletsLocation; +import org.apache.doris.proto.OlapFile.ColumnPB; +import org.apache.doris.proto.OlapFile.TabletSchemaPB; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class FetchRemoteTabletSchemaUtil { + private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class); + + private List<Tablet> tablets; + private List<Column> columns; + + public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) { + this.tablets = tablets; + this.columns = Lists.newArrayList(); + } + + public List<Column> fetch() { + // find be + Preconditions.checkNotNull(tablets); + Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap(); + for (Tablet tablet : tablets) { + for (Replica replica : tablet.getReplicas()) { + Set<Long> tabletIds = beIdToTabletId.computeIfAbsent( + replica.getBackendId(), k -> Sets.newHashSet()); + tabletIds.add(tablet.getId()); + } + } + + // build PTabletsLocation + List<PTabletsLocation> locations = Lists.newArrayList(); + List<Backend> coordinatorBackend = Lists.newArrayList(); + for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) { + Long backendId = entry.getKey(); + Set<Long> tabletIds = entry.getValue(); Review Comment: find alive backend as coordinator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org