This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new fbc361ec7c4 branch-2.1: [opt](arrow-flight-sql) Support 
`arrow-flight-sql` protocol `getStreamCatalogs`, `getStreamSchemas`, 
`getStreamTables` #46217 (#46268)
fbc361ec7c4 is described below

commit fbc361ec7c4b3a380f6cff23d376989ca6831a8c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jan 2 15:07:04 2025 +0800

    branch-2.1: [opt](arrow-flight-sql) Support `arrow-flight-sql` protocol 
`getStreamCatalogs`, `getStreamSchemas`, `getStreamTables` #46217 (#46268)
    
    Cherry-picked from #46217
    
    Co-authored-by: Xinyi Zou <zouxi...@selectdb.com>
---
 .../arrowflight/DorisFlightSqlProducer.java        |  71 +++-
 .../service/arrowflight/FlightSqlSchemaHelper.java | 395 +++++++++++++++++++++
 .../service/arrowflight/auth2/FlightAuthUtils.java |   7 +-
 3 files changed, 460 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index b968ab04c57..154fd9f0b6b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -131,21 +131,19 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         String[] handleParts = handle.split(":");
         String executedPeerIdentity = handleParts[0];
         String queryId = handleParts[1];
+        // The tokens used for authentication between getStreamStatement and 
getFlightInfoStatement are different.
         ConnectContext connectContext = 
flightSessionsManager.getConnectContext(executedPeerIdentity);
         try {
-            // The tokens used for authentication between getStreamStatement 
and getFlightInfoStatement are different.
             final FlightSqlResultCacheEntry flightSqlResultCacheEntry = 
Objects.requireNonNull(
                     connectContext.getFlightSqlChannel().getResult(queryId));
             final VectorSchemaRoot vectorSchemaRoot = 
flightSqlResultCacheEntry.getVectorSchemaRoot();
             listener.start(vectorSchemaRoot);
             listener.putNext();
         } catch (Exception e) {
-            listener.error(e);
             String errMsg = "get stream statement failed, " + e.getMessage() + 
", " + Util.getRootCauseMessage(e)
                     + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
                     + connectContext.getState().getErrorMessage();
-            LOG.warn(errMsg, e);
-            throw 
CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
+            handleStreamException(e, errMsg, listener);
         } finally {
             listener.completed();
             // The result has been sent or sent failed, delete it.
@@ -280,7 +278,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
             String errMsg = "get flight info statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(e)
                     + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
                     + connectContext.getState().getErrorMessage();
-            LOG.warn(errMsg, e);
+            LOG.error(errMsg, e);
             throw 
CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
         } finally {
             connectContext.setCommand(MysqlCommand.COM_SLEEP);
@@ -361,7 +359,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                 String errMsg = "create prepared statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(
                         e) + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
                         + connectContext.getState().getErrorMessage();
-                LOG.warn(errMsg, e);
+                LOG.error(errMsg, e);
                 
listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
                 return;
             } catch (final Throwable t) {
@@ -407,7 +405,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
             } catch (Exception e) {
                 String errMsg = "acceptPutPreparedStatementUpdate failed, " + 
e.getMessage() + ", "
                         + Util.getRootCauseMessage(e);
-                LOG.warn(errMsg, e);
+                LOG.error(errMsg, e);
                 throw 
CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
             }
         };
@@ -451,7 +449,21 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
 
     @Override
     public void getStreamCatalogs(final CallContext context, final 
ServerStreamListener listener) {
-        throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCatalogs 
unimplemented").toRuntimeException();
+        try {
+            ConnectContext connectContext = 
flightSessionsManager.getConnectContext(context.peerIdentity());
+            FlightSqlSchemaHelper flightSqlSchemaHelper = new 
FlightSqlSchemaHelper(connectContext);
+            final Schema schema = Schemas.GET_CATALOGS_SCHEMA;
+
+            try (final VectorSchemaRoot vectorSchemaRoot = 
VectorSchemaRoot.create(schema, rootAllocator)) {
+                listener.start(vectorSchemaRoot);
+                vectorSchemaRoot.allocateNew();
+                flightSqlSchemaHelper.getCatalogs(vectorSchemaRoot);
+                listener.putNext();
+                listener.completed();
+            }
+        } catch (final Exception e) {
+            handleStreamException(e, "", listener);
+        }
     }
 
     @Override
@@ -463,7 +475,22 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
     @Override
     public void getStreamSchemas(final CommandGetDbSchemas command, final 
CallContext context,
             final ServerStreamListener listener) {
-        throw CallStatus.UNIMPLEMENTED.withDescription("getStreamSchemas 
unimplemented").toRuntimeException();
+        try {
+            ConnectContext connectContext = 
flightSessionsManager.getConnectContext(context.peerIdentity());
+            FlightSqlSchemaHelper flightSqlSchemaHelper = new 
FlightSqlSchemaHelper(connectContext);
+            flightSqlSchemaHelper.setParameterForGetDbSchemas(command);
+            final Schema schema = Schemas.GET_SCHEMAS_SCHEMA;
+
+            try (VectorSchemaRoot vectorSchemaRoot = 
VectorSchemaRoot.create(schema, rootAllocator)) {
+                listener.start(vectorSchemaRoot);
+                vectorSchemaRoot.allocateNew();
+                flightSqlSchemaHelper.getSchemas(vectorSchemaRoot);
+                listener.putNext();
+                listener.completed();
+            }
+        } catch (final Exception e) {
+            handleStreamException(e, "", listener);
+        }
     }
 
     @Override
@@ -479,7 +506,23 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
     @Override
     public void getStreamTables(final CommandGetTables command, final 
CallContext context,
             final ServerStreamListener listener) {
-        throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTables 
unimplemented").toRuntimeException();
+        try {
+            ConnectContext connectContext = 
flightSessionsManager.getConnectContext(context.peerIdentity());
+            FlightSqlSchemaHelper flightSqlSchemaHelper = new 
FlightSqlSchemaHelper(connectContext);
+            flightSqlSchemaHelper.setParameterForGetTables(command);
+            final Schema schema = command.getIncludeSchema() ? 
Schemas.GET_TABLES_SCHEMA
+                    : Schemas.GET_TABLES_SCHEMA_NO_SCHEMA;
+
+            try (VectorSchemaRoot vectorSchemaRoot = 
VectorSchemaRoot.create(schema, rootAllocator)) {
+                listener.start(vectorSchemaRoot);
+                vectorSchemaRoot.allocateNew();
+                flightSqlSchemaHelper.getTables(vectorSchemaRoot);
+                listener.putNext();
+                listener.completed();
+            }
+        } catch (final Exception e) {
+            handleStreamException(e, "", listener);
+        }
     }
 
     @Override
@@ -502,7 +545,6 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
     @Override
     public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, 
final CallContext context,
             final ServerStreamListener listener) {
-
         throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPrimaryKeys 
unimplemented").toRuntimeException();
     }
 
@@ -545,9 +587,14 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
     private <T extends Message> FlightInfo getFlightInfoForSchema(final T 
request, final FlightDescriptor descriptor,
             final Schema schema) {
         final Ticket ticket = new Ticket(Any.pack(request).toByteArray());
-        // TODO Support multiple endpoints.
         final List<FlightEndpoint> endpoints = Collections.singletonList(new 
FlightEndpoint(ticket, location));
 
         return new FlightInfo(schema, descriptor, endpoints, -1, -1);
     }
+
+    private static void handleStreamException(Exception e, String errMsg, 
ServerStreamListener listener) {
+        LOG.error(errMsg, e);
+        
listener.error(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
+        throw 
CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java
new file mode 100644
index 00000000000..4b314a16c5f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.service.FrontendServiceImpl;
+import org.apache.doris.thrift.TColumnDef;
+import org.apache.doris.thrift.TColumnDesc;
+import org.apache.doris.thrift.TDescribeTablesParams;
+import org.apache.doris.thrift.TDescribeTablesResult;
+import org.apache.doris.thrift.TGetDbsParams;
+import org.apache.doris.thrift.TGetDbsResult;
+import org.apache.doris.thrift.TGetTablesParams;
+import org.apache.doris.thrift.TListTableStatusResult;
+import org.apache.doris.thrift.TTableStatus;
+
+import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
+import org.apache.arrow.flight.sql.FlightSqlColumnMetadata;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.complex.BaseRepeatedValueVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.ipc.WriteChannel;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FlightSqlSchemaHelper {
+    private static final Logger LOG = 
LogManager.getLogger(FlightSqlSchemaHelper.class);
+    private final ConnectContext ctx;
+    private final FrontendServiceImpl impl;
+    private boolean includeSchema;
+    private String catalogFilterPattern = null;
+    private String dbSchemaFilterPattern = null;
+    private String tableNameFilterPattern = null;
+    private List<String> tableTypesList = null;
+
+    public FlightSqlSchemaHelper(ConnectContext context) {
+        ctx = context;
+        impl = new FrontendServiceImpl(ExecuteEnv.getInstance());
+    }
+
+    private static final byte[] EMPTY_SERIALIZED_SCHEMA = 
getSerializedSchema(Collections.emptyList());
+
+    /**
+     * Convert Doris data type to an arrowType.
+     * <p>
+     * Ref: `convert_to_arrow_type` in be/src/util/arrow/row_batch.cpp.
+     * which is consistent with the type of Arrow data returned by Doris Arrow 
Flight Sql query.
+     */
+    private static ArrowType getArrowType(PrimitiveType primitiveType, Integer 
precision, Integer scale,
+            String timeZone) {
+        switch (primitiveType) {
+            case BOOLEAN:
+                return new ArrowType.Bool();
+            case TINYINT:
+                return new ArrowType.Int(8, true);
+            case SMALLINT:
+                return new ArrowType.Int(16, true);
+            case INT:
+            case IPV4:
+                return new ArrowType.Int(32, true);
+            case BIGINT:
+                return new ArrowType.Int(64, true);
+            case FLOAT:
+                return new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+            case DOUBLE:
+                return new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+            case LARGEINT:
+            case VARCHAR:
+            case STRING:
+            case CHAR:
+            case DATETIME:
+            case DATE:
+            case JSONB:
+            case IPV6:
+            case VARIANT:
+                return new ArrowType.Utf8();
+            case DATEV2:
+                return new ArrowType.Date(DateUnit.MILLISECOND);
+            case DATETIMEV2:
+                if (scale > 3) {
+                    return new ArrowType.Timestamp(TimeUnit.MICROSECOND, 
timeZone);
+                } else if (scale > 0) {
+                    return new ArrowType.Timestamp(TimeUnit.MILLISECOND, 
timeZone);
+                } else {
+                    return new ArrowType.Timestamp(TimeUnit.SECOND, timeZone);
+                }
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                return new ArrowType.Decimal(precision, scale, 128);
+            case DECIMAL256:
+                return new ArrowType.Decimal(precision, scale, 256);
+            case DECIMALV2:
+                return new ArrowType.Decimal(27, 9, 128);
+            case HLL:
+            case BITMAP:
+            case QUANTILE_STATE:
+                return new ArrowType.Binary();
+            case MAP:
+                return new ArrowType.Map(false);
+            case ARRAY:
+                return new ArrowType.List();
+            case STRUCT:
+                return new ArrowType.Struct();
+            default:
+                return new ArrowType.Null();
+        }
+    }
+
+    private static ArrowType columnDescToArrowType(final TColumnDesc desc) {
+        PrimitiveType primitiveType = 
PrimitiveType.fromThrift(desc.getColumnType());
+        Integer precision = desc.isSetColumnPrecision() ? 
desc.getColumnPrecision() : null;
+        Integer scale = desc.isSetColumnScale() ? desc.getColumnScale() : null;
+        // TODO there is no timezone in TColumnDesc, so use current timezone.
+        String timeZone = 
JdbcToArrowUtils.getUtcCalendar().getTimeZone().getID();
+        return getArrowType(primitiveType, precision, scale, timeZone);
+    }
+
+    private static Map<String, String> createFlightSqlColumnMetadata(final 
String dbName, final String tableName,
+            final TColumnDesc desc) {
+        final FlightSqlColumnMetadata.Builder columnMetadataBuilder = new 
FlightSqlColumnMetadata.Builder().schemaName(
+                        
dbName).tableName(tableName).typeName(PrimitiveType.fromThrift(desc.getColumnType()).toString())
+                
.isAutoIncrement(false).isCaseSensitive(false).isReadOnly(true).isSearchable(true);
+
+        if (desc.isSetColumnPrecision()) {
+            columnMetadataBuilder.precision(desc.getColumnPrecision());
+        }
+        if (desc.isSetColumnScale()) {
+            columnMetadataBuilder.scale(desc.getColumnScale());
+        }
+        return columnMetadataBuilder.build().getMetadataMap();
+    }
+
+    protected static byte[] getSerializedSchema(List<Field> fields) {
+        if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) {
+            fields = Collections.emptyList();
+        } else if (fields == null) {
+            return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, 
EMPTY_SERIALIZED_SCHEMA.length);
+        }
+
+        final ByteArrayOutputStream columnOutputStream = new 
ByteArrayOutputStream();
+        final Schema schema = new Schema(fields);
+
+        try {
+            MessageSerializer.serialize(new 
WriteChannel(Channels.newChannel(columnOutputStream)), schema);
+        } catch (final IOException e) {
+            throw new RuntimeException("IO Error when serializing schema '" + 
schema + "'.", e);
+        }
+
+        return columnOutputStream.toByteArray();
+    }
+
+    /**
+     * Set in the Tables request object the parameter that user passed via 
CommandGetTables.
+     */
+    public void setParameterForGetTables(CommandGetTables command) {
+        includeSchema = command.getIncludeSchema();
+        catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : 
"internal";
+        dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? 
command.getDbSchemaFilterPattern() : null;
+        tableNameFilterPattern = command.hasTableNameFilterPattern() ? 
command.getTableNameFilterPattern() : null;
+        tableTypesList = command.getTableTypesList().isEmpty() ? null : 
command.getTableTypesList();
+    }
+
+    /**
+     * Set in the Schemas request object the parameter that user passed via 
CommandGetDbSchemas.
+     */
+    public void setParameterForGetDbSchemas(CommandGetDbSchemas command) {
+        catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : 
"internal";
+        dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? 
command.getDbSchemaFilterPattern() : null;
+    }
+
+    /**
+     * Call FrontendServiceImpl->getDbNames.
+     */
+    private TGetDbsResult getDbNames() throws TException {
+        TGetDbsParams getDbsParams = new TGetDbsParams();
+        if (catalogFilterPattern != null) {
+            getDbsParams.setCatalog(catalogFilterPattern);
+        }
+        if (dbSchemaFilterPattern != null) {
+            getDbsParams.setPattern(dbSchemaFilterPattern);
+        }
+        
getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
+        return impl.getDbNames(getDbsParams);
+    }
+
+    /**
+     * Call FrontendServiceImpl->listTableStatus.
+     */
+    private TListTableStatusResult listTableStatus(String dbName, String 
catalogName) throws TException {
+        TGetTablesParams getTablesParams = new TGetTablesParams();
+        getTablesParams.setDb(dbName);
+        if (!catalogName.isEmpty()) {
+            getTablesParams.setCatalog(catalogName);
+        }
+        if (tableNameFilterPattern != null) {
+            getTablesParams.setPattern(tableNameFilterPattern);
+        }
+        if (tableTypesList != null) {
+            getTablesParams.setType(tableTypesList.get(0)); // currently only 
one type is supported.
+        }
+        
getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
+        return impl.listTableStatus(getTablesParams);
+    }
+
+    /**
+     * Call FrontendServiceImpl->describeTables.
+     */
+    private TDescribeTablesResult describeTables(String dbName, String 
catalogName, List<String> tablesName)
+            throws TException {
+        TDescribeTablesParams describeTablesParams = new 
TDescribeTablesParams();
+        describeTablesParams.setDb(dbName);
+        if (!catalogName.isEmpty()) {
+            describeTablesParams.setCatalog(catalogName);
+        }
+        describeTablesParams.setTablesName(tablesName);
+        
describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
+        return impl.describeTables(describeTablesParams);
+    }
+
+    /**
+     * Construct <tableName, List<ArrowType>>
+     */
+    private Map<String, List<Field>> buildTableToFields(String dbName, 
TDescribeTablesResult describeTablesResult,
+            List<String> tablesName) {
+        Map<String, List<Field>> tableToFields = new HashMap<>();
+        int columnIndex = 0;
+        for (int tableIndex = 0; tableIndex < 
describeTablesResult.getTablesOffsetSize(); tableIndex++) {
+            String tableName = tablesName.get(tableIndex);
+            final List<Field> fields = new ArrayList<>();
+            Integer tableOffset = 
describeTablesResult.getTablesOffset().get(tableIndex);
+            for (; columnIndex < tableOffset; columnIndex++) {
+                TColumnDef columnDef = 
describeTablesResult.getColumns().get(columnIndex);
+                TColumnDesc columnDesc = columnDef.getColumnDesc();
+                final ArrowType columnArrowType = 
columnDescToArrowType(columnDesc);
+
+                List<Field> columnArrowTypeChildren;
+                // Arrow complex types may require children fields for parsing 
the schema on C++
+                switch (columnArrowType.getTypeID()) {
+                    case List:
+                    case LargeList:
+                    case FixedSizeList:
+                        columnArrowTypeChildren = Collections.singletonList(
+                                
Field.notNullable(BaseRepeatedValueVector.DATA_VECTOR_NAME,
+                                        
ZeroVector.INSTANCE.getField().getType()));
+                        break;
+                    case Map:
+                        columnArrowTypeChildren = Collections.singletonList(
+                                Field.notNullable(MapVector.DATA_VECTOR_NAME, 
new ArrowType.List()));
+                        break;
+                    case Struct:
+                        columnArrowTypeChildren = Collections.emptyList();
+                        break;
+                    default:
+                        columnArrowTypeChildren = null;
+                        break;
+                }
+
+                final Field field = new Field(columnDesc.getColumnName(),
+                        new FieldType(columnDesc.isIsAllowNull(), 
columnArrowType, null,
+                                createFlightSqlColumnMetadata(dbName, 
tableName, columnDesc)), columnArrowTypeChildren);
+                fields.add(field);
+            }
+            tableToFields.put(tableName, fields);
+        }
+        return tableToFields;
+    }
+
+    /**
+     * for FlightSqlProducer Schemas.GET_CATALOGS_SCHEMA
+     */
+    public void getCatalogs(VectorSchemaRoot vectorSchemaRoot) throws 
TException {
+        VarCharVector catalogNameVector = (VarCharVector) 
vectorSchemaRoot.getVector("catalog_name");
+
+        Set<String> catalogsSet = new LinkedHashSet<>();
+        catalogsSet.add("internal"); // An ordered Set with "internal" first.
+        for (CatalogIf catalog : 
Env.getCurrentEnv().getCatalogMgr().listCatalogs()) {
+            catalogsSet.add(catalog.getName());
+        }
+
+        int catalogIndex = 0;
+        for (String catalog : catalogsSet) {
+            catalogNameVector.setSafe(catalogIndex, new Text(catalog));
+            catalogIndex++;
+        }
+        vectorSchemaRoot.setRowCount(catalogIndex);
+    }
+
+    /**
+     * for FlightSqlProducer Schemas.GET_SCHEMAS_SCHEMA
+     */
+    public void getSchemas(VectorSchemaRoot vectorSchemaRoot) throws 
TException {
+        VarCharVector catalogNameVector = (VarCharVector) 
vectorSchemaRoot.getVector("catalog_name");
+        VarCharVector schemaNameVector = (VarCharVector) 
vectorSchemaRoot.getVector("db_schema_name");
+
+        TGetDbsResult getDbsResult = getDbNames();
+        for (int dbIndex = 0; dbIndex < getDbsResult.getDbs().size(); 
dbIndex++) {
+            String dbName = getDbsResult.getDbs().get(dbIndex);
+            String catalogName = getDbsResult.isSetCatalogs() ? 
getDbsResult.getCatalogs().get(dbIndex) : "";
+            catalogNameVector.setSafe(dbIndex, new Text(catalogName));
+            schemaNameVector.setSafe(dbIndex, new Text(dbName));
+        }
+        vectorSchemaRoot.setRowCount(getDbsResult.getDbs().size());
+    }
+
+    /**
+     * for FlightSqlProducer Schemas.GET_TABLES_SCHEMA_NO_SCHEMA and 
Schemas.GET_TABLES_SCHEMA
+     */
+    public void getTables(VectorSchemaRoot vectorSchemaRoot) throws TException 
{
+        VarCharVector catalogNameVector = (VarCharVector) 
vectorSchemaRoot.getVector("catalog_name");
+        VarCharVector schemaNameVector = (VarCharVector) 
vectorSchemaRoot.getVector("db_schema_name");
+        VarCharVector tableNameVector = (VarCharVector) 
vectorSchemaRoot.getVector("table_name");
+        VarCharVector tableTypeVector = (VarCharVector) 
vectorSchemaRoot.getVector("table_type");
+        VarBinaryVector schemaVector = (VarBinaryVector) 
vectorSchemaRoot.getVector("table_schema");
+
+        int tablesCount = 0;
+        TGetDbsResult getDbsResult = getDbNames();
+        for (int dbIndex = 0; dbIndex < getDbsResult.getDbs().size(); 
dbIndex++) {
+            String dbName = getDbsResult.getDbs().get(dbIndex);
+            String catalogName = getDbsResult.isSetCatalogs() ? 
getDbsResult.getCatalogs().get(dbIndex) : "";
+            TListTableStatusResult listTableStatusResult = 
listTableStatus(dbName, catalogName);
+
+            Map<String, List<Field>> tableToFields;
+            if (includeSchema) {
+                List<String> tablesName = new ArrayList<>();
+                for (TTableStatus tableStatus : 
listTableStatusResult.getTables()) {
+                    tablesName.add(tableStatus.getName());
+                }
+                TDescribeTablesResult describeTablesResult = 
describeTables(dbName, catalogName, tablesName);
+                tableToFields = buildTableToFields(dbName, 
describeTablesResult, tablesName);
+            } else {
+                tableToFields = null;
+            }
+
+            for (TTableStatus tableStatus : listTableStatusResult.getTables()) 
{
+                catalogNameVector.setSafe(tablesCount, new Text(catalogName));
+                schemaNameVector.setSafe(tablesCount, new Text(dbName));
+                tableNameVector.setSafe(tablesCount, new 
Text(tableStatus.getName()));
+                tableTypeVector.setSafe(tablesCount, new 
Text(tableStatus.getType()));
+                if (includeSchema) {
+                    List<Field> fields = 
tableToFields.get(tableStatus.getName());
+                    schemaVector.setSafe(tablesCount, 
getSerializedSchema(fields));
+                }
+                tablesCount++;
+            }
+        }
+        vectorSchemaRoot.setRowCount(tablesCount);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java
index b605dff66b6..c93e0b5a309 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java
@@ -49,7 +49,12 @@ public final class FlightAuthUtils {
             Logger logger) {
         try {
             List<UserIdentity> currentUserIdentity = Lists.newArrayList();
-
+            // If the password is empty, DBeaver will pass "null" string for 
authentication.
+            // This behavior of DBeaver is strange, but we have to be 
compatible with it, of course,
+            // it may be a problem with Arrow Flight Jdbc driver.
+            // Here, "null" is converted to null, if user's password is really 
the string "null",
+            // authentication will fail. Usually, the user's password will not 
be "null", let's hope so.
+            password = (password.equals("null")) ? null : password;
             Env.getCurrentEnv().getAuth().checkPlainPassword(username, 
remoteIp, password, currentUserIdentity);
             Preconditions.checkState(currentUserIdentity.size() == 1);
             return FlightAuthResult.of(username, currentUserIdentity.get(0), 
remoteIp);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to