This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 9bee4582e7a branch-2.1: [fix](query tvf): resolve column mismatch 
error in JDBC query function (#54077) (#54249)
9bee4582e7a is described below

commit 9bee4582e7a058247dd6b7b61c6c13cb37b637c7
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Aug 4 19:44:27 2025 -0700

    branch-2.1: [fix](query tvf): resolve column mismatch error in JDBC query 
function (#54077) (#54249)
    
    bp #54077
    
    ---------
    
    Co-authored-by: XnY-wei <[email protected]>
    Co-authored-by: weixingyu12 <[email protected]>
---
 be/src/pipeline/exec/jdbc_scan_operator.cpp        |   3 +-
 be/src/pipeline/exec/jdbc_scan_operator.h          |   1 +
 be/src/vec/exec/scan/new_jdbc_scan_node.cpp        |   7 +-
 be/src/vec/exec/scan/new_jdbc_scan_node.h          |   1 +
 be/src/vec/exec/scan/new_jdbc_scanner.cpp          |  13 ++-
 be/src/vec/exec/scan/new_jdbc_scanner.h            |   5 +-
 be/src/vec/exec/vjdbc_connector.cpp                |   7 +-
 be/src/vec/exec/vjdbc_connector.h                  |   1 +
 .../org/apache/doris/jdbc/BaseJdbcExecutor.java    |  94 +++++++++++++++---
 .../org/apache/doris/jdbc/MySQLJdbcExecutor.java   | 105 +++++++++++----------
 .../doris/datasource/jdbc/source/JdbcScanNode.java |   1 +
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 gensrc/thrift/Types.thrift                         |   1 +
 .../jdbc/test_doris_jdbc_catalog.out               | Bin 3627 -> 3724 bytes
 .../jdbc/test_doris_jdbc_catalog.groovy            |  36 ++++---
 15 files changed, 188 insertions(+), 88 deletions(-)

diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp 
b/be/src/pipeline/exec/jdbc_scan_operator.cpp
index 35ad7ec0490..e145a719e36 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp
@@ -30,7 +30,7 @@ std::string JDBCScanLocalState::name_suffix() const {
 Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     auto& p = _parent->cast<JDBCScanOperatorX>();
     std::unique_ptr<vectorized::NewJdbcScanner> scanner = 
vectorized::NewJdbcScanner::create_unique(
-            state(), this, p._limit, p._tuple_id, p._query_string, 
p._table_type,
+            state(), this, p._limit, p._tuple_id, p._query_string, 
p._table_type, p._is_tvf,
             _scanner_profile.get());
     RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
     scanners->push_back(std::move(scanner));
@@ -45,6 +45,7 @@ JDBCScanOperatorX::JDBCScanOperatorX(ObjectPool* pool, const 
TPlanNode& tnode, i
           _query_string(tnode.jdbc_scan_node.query_string),
           _table_type(tnode.jdbc_scan_node.table_type) {
     _output_tuple_id = tnode.jdbc_scan_node.tuple_id;
+    _is_tvf = tnode.jdbc_scan_node.__isset.is_tvf ? 
tnode.jdbc_scan_node.is_tvf : false;
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.h 
b/be/src/pipeline/exec/jdbc_scan_operator.h
index 825e01acc2a..07dfec37239 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.h
+++ b/be/src/pipeline/exec/jdbc_scan_operator.h
@@ -62,6 +62,7 @@ private:
     TupleId _tuple_id;
     std::string _query_string;
     TOdbcTableType::type _table_type;
+    bool _is_tvf;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp 
b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
index f8219b4337e..f6a1e3e6ec1 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -44,6 +44,7 @@ NewJdbcScanNode::NewJdbcScanNode(ObjectPool* pool, const 
TPlanNode& tnode,
           _query_string(tnode.jdbc_scan_node.query_string),
           _table_type(tnode.jdbc_scan_node.table_type) {
     _output_tuple_id = tnode.jdbc_scan_node.tuple_id;
+    _is_tvf = tnode.jdbc_scan_node.__isset.is_tvf ? 
tnode.jdbc_scan_node.is_tvf : false;
 }
 
 std::string NewJdbcScanNode::get_name() {
@@ -65,9 +66,9 @@ Status 
NewJdbcScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
     if (_eos == true) {
         return Status::OK();
     }
-    std::unique_ptr<NewJdbcScanner> scanner =
-            NewJdbcScanner::create_unique(_state, this, _limit_per_scanner, 
_tuple_id,
-                                          _query_string, _table_type, 
_state->runtime_profile());
+    std::unique_ptr<NewJdbcScanner> scanner = NewJdbcScanner::create_unique(
+            _state, this, _limit_per_scanner, _tuple_id, _query_string, 
_table_type, _is_tvf,
+            _state->runtime_profile());
     RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
     scanners->push_back(std::move(scanner));
     return Status::OK();
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.h 
b/be/src/vec/exec/scan/new_jdbc_scan_node.h
index 54a8dc7d3dd..ee284b7e4a3 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.h
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.h
@@ -51,6 +51,7 @@ private:
     TupleId _tuple_id;
     std::string _query_string;
     TOdbcTableType::type _table_type;
+    bool _is_tvf;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp 
b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 830b4e51383..cdcdf677e59 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -38,26 +38,30 @@
 namespace doris::vectorized {
 NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, 
int64_t limit,
                                const TupleId& tuple_id, const std::string& 
query_string,
-                               TOdbcTableType::type table_type, 
RuntimeProfile* profile)
+                               TOdbcTableType::type table_type, bool is_tvf,
+                               RuntimeProfile* profile)
         : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _jdbc_eos(false),
           _tuple_id(tuple_id),
           _query_string(query_string),
           _tuple_desc(nullptr),
-          _table_type(table_type) {
+          _table_type(table_type),
+          _is_tvf(is_tvf) {
     _init_profile(get_parent()->_scanner_profile);
 }
 
 NewJdbcScanner::NewJdbcScanner(RuntimeState* state,
                                doris::pipeline::JDBCScanLocalState* 
local_state, int64_t limit,
                                const TupleId& tuple_id, const std::string& 
query_string,
-                               TOdbcTableType::type table_type, 
RuntimeProfile* profile)
+                               TOdbcTableType::type table_type, bool is_tvf,
+                               RuntimeProfile* profile)
         : VScanner(state, local_state, limit, profile),
           _jdbc_eos(false),
           _tuple_id(tuple_id),
           _query_string(query_string),
           _tuple_desc(nullptr),
-          _table_type(table_type) {
+          _table_type(table_type),
+          _is_tvf(is_tvf) {
     _init_profile(local_state->_scanner_profile);
 }
 
@@ -97,6 +101,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& con
     _jdbc_param.query_string = std::move(_query_string);
     _jdbc_param.use_transaction = false; // not useful for scanner but only 
sink.
     _jdbc_param.table_type = _table_type;
+    _jdbc_param.is_tvf = _is_tvf;
     _jdbc_param.connection_pool_min_size = 
jdbc_table->connection_pool_min_size();
     _jdbc_param.connection_pool_max_size = 
jdbc_table->connection_pool_max_size();
     _jdbc_param.connection_pool_max_life_time = 
jdbc_table->connection_pool_max_life_time();
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h 
b/be/src/vec/exec/scan/new_jdbc_scanner.h
index bda6d6b1dc8..5020241836b 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.h
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.h
@@ -48,10 +48,10 @@ public:
 
     NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
                    const TupleId& tuple_id, const std::string& query_string,
-                   TOdbcTableType::type table_type, RuntimeProfile* profile);
+                   TOdbcTableType::type table_type, bool is_tvf, 
RuntimeProfile* profile);
     NewJdbcScanner(RuntimeState* state, doris::pipeline::JDBCScanLocalState* 
parent, int64_t limit,
                    const TupleId& tuple_id, const std::string& query_string,
-                   TOdbcTableType::type table_type, RuntimeProfile* profile);
+                   TOdbcTableType::type table_type, bool is_tvf, 
RuntimeProfile* profile);
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
@@ -87,6 +87,7 @@ private:
     const TupleDescriptor* _tuple_desc = nullptr;
     // the sql query database type: like mysql, PG...
     TOdbcTableType::type _table_type;
+    bool _is_tvf;
     // Scanner of JDBC.
     std::unique_ptr<JdbcConnector> _jdbc_connector;
     JdbcConnectorParam _jdbc_param;
diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index f3f4b4d8a1c..1a68e5d6c61 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -144,6 +144,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
         ctor_params.__set_connection_pool_cache_clear_time(
                 config::jdbc_connection_pool_cache_clear_time_sec);
         
ctor_params.__set_connection_pool_keep_alive(_conn_param.connection_pool_keep_alive);
+        ctor_params.__set_is_tvf(_conn_param.is_tvf);
 
         jbyteArray ctor_params_bytes;
         // Pushed frame will be popped when jni_frame goes out-of-scope.
@@ -209,8 +210,10 @@ Status JdbcConnector::query() {
             return Status::InternalError("GetJniExceptionMsg meet error, 
query={}, msg={}",
                                          _conn_param.query_string, 
status.to_string());
         }
-        if (colunm_count != materialize_num) {
-            return Status::InternalError("input and output column num not 
equal of jdbc query.");
+        if (colunm_count < materialize_num) {
+            return Status::InternalError(
+                    "JDBC query returned fewer columns ({}) than required 
({}).", colunm_count,
+                    materialize_num);
         }
     }
 
diff --git a/be/src/vec/exec/vjdbc_connector.h 
b/be/src/vec/exec/vjdbc_connector.h
index a09d390dc7c..ed202e21350 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -56,6 +56,7 @@ struct JdbcConnectorParam {
     std::string table_name;
     bool use_transaction = false;
     TOdbcTableType::type table_type;
+    bool is_tvf = false;
     int32_t connection_pool_min_size = -1;
     int32_t connection_pool_max_size = -1;
     int32_t connection_pool_max_wait_time = -1;
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
index 383f1fe9aa1..82eb3cfeefa 100644
--- 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
@@ -56,6 +56,7 @@ import java.sql.Statement;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -79,6 +80,19 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
     protected String jdbcDriverVersion;
     private static final Map<URL, ClassLoader> classLoaderMap = 
Maps.newConcurrentMap();
 
+    // col name(lowercase) -> index in resultSetMetaData
+    // this map is only used for "query()" tvf, so only valid if isTvf is true.
+    // Because for "query()" tvf, the sql string is written by user, so the 
column name in resultSetMetaData
+    // maybe larger than the column name in outputTable.
+    // For example, if the sql is "select a from query('select a,b from tbl')",
+    // the column num in resultSetMetaData is 2, but the outputTable only has 
1 column "a".
+    // But if the sql is "select a from (select a,b from tbl)x",
+    // the column num in resultSetMetaData is 1, and the outputTable also has 
1 column "a".
+    // Because the planner will do the column pruning before generating the 
sql string.
+    // So, for query() tvf, we need to map the column name in outputTable to 
the column index in resultSetMetaData.
+    private Map<String, Integer> resultSetColumnMap = null;
+    private boolean isTvf = false;
+
     public BaseJdbcExecutor(byte[] thriftParams) throws Exception {
         setJdbcDriverSystemProperties();
         TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -107,6 +121,7 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
         
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
         init(config, request.statement);
         this.jdbcDriverVersion = getJdbcDriverVersion();
+        this.isTvf = request.isSetIsTvf() ? request.is_tvf : false;
     }
 
     public void close() throws Exception {
@@ -206,32 +221,59 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
 
             if (isNullableString == null || replaceString == null) {
                 throw new IllegalArgumentException(
-                        "Output parameters 'is_nullable' and 'replace_string' 
are required.");
+                    "Output parameters 'is_nullable' and 'replace_string' are 
required.");
             }
 
             String[] nullableList = isNullableString.split(",");
             String[] replaceStringList = replaceString.split(",");
             curBlockRows = 0;
-            int columnCount = resultSetMetaData.getColumnCount();
 
-            initializeBlock(columnCount, replaceStringList, batchSize, 
outputTable);
+            int outputColumnCount = outputTable.getColumns().length;
+            initializeBlock(outputColumnCount, replaceStringList, batchSize, 
outputTable);
+
+            // the resultSetColumnMap is only for "query()" tvf
+            if (this.isTvf && this.resultSetColumnMap == null) {
+                this.resultSetColumnMap = new HashMap<>();
+                int resultSetColumnCount = resultSetMetaData.getColumnCount();
+                for (int i = 1; i <= resultSetColumnCount; i++) {
+                    String columnName = 
resultSetMetaData.getColumnName(i).trim().toLowerCase();
+                    resultSetColumnMap.put(columnName, i);
+                }
+            }
 
             do {
-                for (int i = 0; i < columnCount; ++i) {
-                    ColumnType type = outputTable.getColumnType(i);
-                    block.get(i)[curBlockRows] = getColumnValue(i, type, 
replaceStringList);
+                for (int i = 0; i < outputColumnCount; ++i) {
+                    String outputColumnName = outputTable.getFields()[i];
+                    int columnIndex = getRealColumnIndex(outputColumnName, i);
+                    if (columnIndex > -1) {
+                        ColumnType type = convertTypeIfNecessary(i, 
outputTable.getColumnType(i), replaceStringList);
+                        block.get(i)[curBlockRows] = 
getColumnValue(columnIndex, type, replaceStringList);
+                    } else {
+                        throw new RuntimeException("Column not found in 
resultSetColumnMap: " + outputColumnName);
+                    }
                 }
                 curBlockRows++;
             } while (curBlockRows < batchSize && resultSet.next());
 
-            for (int i = 0; i < columnCount; ++i) {
-                ColumnType type = outputTable.getColumnType(i);
-                Object[] columnData = block.get(i);
-                Class<?> componentType = 
columnData.getClass().getComponentType();
-                Object[] newColumn = (Object[]) 
Array.newInstance(componentType, curBlockRows);
-                System.arraycopy(columnData, 0, newColumn, 0, curBlockRows);
-                boolean isNullable = Boolean.parseBoolean(nullableList[i]);
-                outputTable.appendData(i, newColumn, getOutputConverter(type, 
replaceStringList[i]), isNullable);
+            for (int i = 0; i < outputColumnCount; ++i) {
+                String outputColumnName = outputTable.getFields()[i];
+                int columnIndex = getRealColumnIndex(outputColumnName, i);
+                if (columnIndex > -1) {
+                    ColumnType type = outputTable.getColumnType(i);
+                    Object[] columnData = block.get(i);
+                    Class<?> componentType = 
columnData.getClass().getComponentType();
+                    Object[] newColumn = (Object[]) 
Array.newInstance(componentType, curBlockRows);
+                    System.arraycopy(columnData, 0, newColumn, 0, 
curBlockRows);
+                    boolean isNullable = Boolean.parseBoolean(nullableList[i]);
+                    outputTable.appendData(
+                            i,
+                            newColumn,
+                            getOutputConverter(type, replaceStringList[i]),
+                            isNullable
+                    );
+                } else {
+                    throw new RuntimeException("Column not found in 
resultSetColumnMap: " + outputColumnName);
+                }
             }
         } catch (Exception e) {
             LOG.warn("jdbc get block address exception: ", e);
@@ -242,6 +284,14 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
         return outputTable.getMetaAddress();
     }
 
+    private int getRealColumnIndex(String outputColumnName, int 
indexInOutputTable) {
+        // -1 because ResultSetMetaData column index starts from 1, but index 
in outputTable starts from 0.
+        int columnIndex = this.isTvf
+                ? 
resultSetColumnMap.getOrDefault(outputColumnName.toLowerCase(), 0) - 1 : 
indexInOutputTable;
+        return columnIndex;
+    }
+
+
     protected void initializeBlock(int columnCount, String[] 
replaceStringList, int batchSizeNum,
             VectorTable outputTable) {
         for (int i = 0; i < columnCount; ++i) {
@@ -442,6 +492,19 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
     protected abstract Object getColumnValue(int columnIndex, ColumnType type, 
String[] replaceStringList)
             throws SQLException;
 
+    /**
+     * Some special column types (like bitmap/hll in Doris) may need to be 
converted to string.
+     * Subclass can override this method to handle such conversions.
+     *
+     * @param outputIdx
+     * @param origType
+     * @param replaceStringList
+     * @return
+     */
+    protected ColumnType convertTypeIfNecessary(int outputIdx, ColumnType 
origType, String[] replaceStringList) {
+        return origType;
+    }
+
     /*
     | Type                                        | Java Array Type            
|
     
|---------------------------------------------|----------------------------|
@@ -650,3 +713,6 @@ public abstract class BaseJdbcExecutor implements 
JdbcExecutor {
         return hexString.toString();
     }
 }
+
+
+
diff --git 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
index 4e5af95211b..9ceb613425a 100644
--- 
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
+++ 
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
@@ -101,60 +101,67 @@ public class MySQLJdbcExecutor extends BaseJdbcExecutor {
         }
     }
 
+    @Override
+    protected ColumnType convertTypeIfNecessary(int outputIdx, ColumnType 
origType, String[] replaceStringList) {
+        if (replaceStringList[outputIdx].equals("bitmap") || 
replaceStringList[outputIdx].equals("hll")) {
+            return new ColumnType(origType.getName(), Type.BYTE);
+        }
+        return origType;
+    }
+
     @Override
     protected Object getColumnValue(int columnIndex, ColumnType type, String[] 
replaceStringList) throws SQLException {
-        if (replaceStringList[columnIndex].equals("bitmap") || 
replaceStringList[columnIndex].equals("hll")) {
-            byte[] data = resultSet.getBytes(columnIndex + 1);
-            if (resultSet.wasNull()) {
-                return null;
-            }
-            return data;
-        } else {
-            switch (type.getType()) {
-                case BOOLEAN:
-                    return resultSet.getObject(columnIndex + 1, Boolean.class);
-                case TINYINT:
-                case SMALLINT:
-                case LARGEINT:
+        switch (type.getType()) {
+            case BOOLEAN:
+                return resultSet.getObject(columnIndex + 1, Boolean.class);
+            case TINYINT:
+            case SMALLINT:
+            case LARGEINT:
+                return resultSet.getObject(columnIndex + 1);
+            case INT:
+                return resultSet.getObject(columnIndex + 1, Integer.class);
+            case BIGINT:
+                return resultSet.getObject(columnIndex + 1, Long.class);
+            case FLOAT:
+                return resultSet.getObject(columnIndex + 1, Float.class);
+            case DOUBLE:
+                return resultSet.getObject(columnIndex + 1, Double.class);
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                return resultSet.getObject(columnIndex + 1, BigDecimal.class);
+            case DATE:
+            case DATEV2:
+                return resultSet.getObject(columnIndex + 1, LocalDate.class);
+            case DATETIME:
+            case DATETIMEV2:
+                return resultSet.getObject(columnIndex + 1, 
LocalDateTime.class);
+            case CHAR:
+            case VARCHAR:
+            case ARRAY:
+                return resultSet.getObject(columnIndex + 1, String.class);
+            case STRING: {
+                int jdbcType = resultSetMetaData.getColumnType(columnIndex + 
1);
+                // If it is a time type in mysql, or use mysql driver connect 
mariadb
+                // We need to obtain the string directly to ensure that we can 
obtain a time other than 24 hours.
+                // If it is another database, such as oceanbase, this 
processing will lose precision information,
+                // so the original processing method will be maintained for 
the time being.
+                if (jdbcType == Types.TIME && config.getTableType() == 
TOdbcTableType.MYSQL) {
+                    return resultSet.getString(columnIndex + 1);
+                } else {
                     return resultSet.getObject(columnIndex + 1);
-                case INT:
-                    return resultSet.getObject(columnIndex + 1, Integer.class);
-                case BIGINT:
-                    return resultSet.getObject(columnIndex + 1, Long.class);
-                case FLOAT:
-                    return resultSet.getObject(columnIndex + 1, Float.class);
-                case DOUBLE:
-                    return resultSet.getObject(columnIndex + 1, Double.class);
-                case DECIMALV2:
-                case DECIMAL32:
-                case DECIMAL64:
-                case DECIMAL128:
-                    return resultSet.getObject(columnIndex + 1, 
BigDecimal.class);
-                case DATE:
-                case DATEV2:
-                    return resultSet.getObject(columnIndex + 1, 
LocalDate.class);
-                case DATETIME:
-                case DATETIMEV2:
-                    return resultSet.getObject(columnIndex + 1, 
LocalDateTime.class);
-                case CHAR:
-                case VARCHAR:
-                case ARRAY:
-                    return resultSet.getObject(columnIndex + 1, String.class);
-                case STRING: {
-                    int jdbcType = resultSetMetaData.getColumnType(columnIndex 
+ 1);
-                    // If it is a time type in mysql, or use mysql driver 
connect mariadb
-                    // We need to obtain the string directly to ensure that we 
can obtain a time other than 24 hours.
-                    // If it is another database, such as oceanbase, this 
processing will lose precision information,
-                    // so the original processing method will be maintained 
for the time being.
-                    if (jdbcType == Types.TIME && config.getTableType() == 
TOdbcTableType.MYSQL) {
-                        return resultSet.getString(columnIndex + 1);
-                    } else {
-                        return resultSet.getObject(columnIndex + 1);
-                    }
                 }
-                default:
-                    throw new IllegalArgumentException("Unsupported column 
type: " + type.getType());
             }
+            case BYTE: {
+                byte[] data = resultSet.getBytes(columnIndex + 1);
+                if (resultSet.wasNull()) {
+                    return null;
+                }
+                return data;
+            }
+            default:
+                throw new IllegalArgumentException("Unsupported column type: " 
+ type.getType());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index 346c27a68bd..d1198bde357 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -282,6 +282,7 @@ public class JdbcScanNode extends ExternalScanNode {
             msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
         }
         msg.jdbc_scan_node.setTableType(jdbcType);
+        msg.jdbc_scan_node.setIsTvf(isTableValuedFunction);
     }
 
     @Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 7ccb12b3331..1f1b36782a0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -613,6 +613,7 @@ struct TJdbcScanNode {
   2: optional string table_name
   3: optional string query_string
   4: optional Types.TOdbcTableType table_type
+  5: optional bool is_tvf
 }
 
 struct TBrokerScanNode {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 336988e4b88..7b0111c1ed9 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -437,6 +437,7 @@ struct TJdbcExecutorCtorParams {
   15: optional bool connection_pool_keep_alive
   16: optional i64 catalog_id
   17: optional string jdbc_driver_checksum
+  18: optional bool is_tvf
 }
 
 struct TJavaUdfExecutorCtorParams {
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out 
b/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out
index 9695f628fee..220c756a0e2 100644
Binary files 
a/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out and 
b/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out differ
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy 
b/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
index c4fce17c62c..ec77abb51a4 100644
--- 
a/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
+++ 
b/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
@@ -231,20 +231,30 @@ suite("test_doris_jdbc_catalog", 
"p0,external,doris,external_docker,external_doc
     // test query tvf
     qt_sql """desc function query("catalog" = "doris_jdbc_catalog", "query" = 
"select * from regression_test_jdbc_catalog_p0.base");"""
 
-    order_qt_sql """ select varchar_col,tinyint_col from query("catalog" = 
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from 
regression_test_jdbc_catalog_p0.base");"""
+    order_qt_sql1 """ select varchar_col,tinyint_col from query("catalog" = 
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from 
regression_test_jdbc_catalog_p0.base");"""
 
-    order_qt_sql """ select tinyint_col,varchar_col from query("catalog" = 
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from 
regression_test_jdbc_catalog_p0.base");"""
+    order_qt_sql2 """ select tinyint_col,varchar_col from query("catalog" = 
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from 
regression_test_jdbc_catalog_p0.base");"""
 
-    //clean
-    qt_sql """select current_catalog()"""
-    sql "switch internal"
-    qt_sql """select current_catalog()"""
-    sql "use regression_test_jdbc_catalog_p0"
-    sql """ drop table if exists test_doris_jdbc_doris_in_tb """
-    sql """ drop table if exists bowen_hll_test """
-    sql """ drop table if exists base """
-    sql """ drop table if exists all_null_tbl """
-    sql """ drop table if exists arr """
-    sql """ drop table if exists test_insert_order """
+    order_qt_sql3 """ select varchar_col from query("catalog" = 
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from 
regression_test_jdbc_catalog_p0.base");"""
+
+    order_qt_sql4 """ select tinyint_col from query("catalog" = 
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from 
regression_test_jdbc_catalog_p0.base");"""
+
+    order_qt_sql5 """ with tmp as (select varchar_col,tinyint_col from 
query("catalog" = "doris_jdbc_catalog", "query" = "select 
varchar_col,tinyint_col from regression_test_jdbc_catalog_p0.base")) select 
tinyint_col from tmp;"""
+
+    order_qt_sql6 """ with tmp as (select varchar_col,tinyint_col from 
query("catalog" = "doris_jdbc_catalog", "query" = "select 
varchar_col,tinyint_col from regression_test_jdbc_catalog_p0.base")) select 
tinyint_col,varchar_col from tmp;"""
+
+    order_qt_sql7 """ with tmp as (select tinyint_col,varchar_col from 
query("catalog" = "doris_jdbc_catalog", "query" = "select 
varchar_col,tinyint_col from regression_test_jdbc_catalog_p0.base")) select 
tinyint_col from tmp;"""
+
+    // //clean
+    // qt_sql """select current_catalog()"""
+    // sql "switch internal"
+    // qt_sql """select current_catalog()"""
+    // sql "use regression_test_jdbc_catalog_p0"
+    // sql """ drop table if exists test_doris_jdbc_doris_in_tb """
+    // sql """ drop table if exists bowen_hll_test """
+    // sql """ drop table if exists base """
+    // sql """ drop table if exists all_null_tbl """
+    // sql """ drop table if exists arr """
+    // sql """ drop table if exists test_insert_order """
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to