This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b53ccafb8998556a85818f5074b23993572d811d
Author: gaoxin <ashin...@outlook.com>
AuthorDate: Tue Apr 11 19:51:26 2023 +0800

    [feature](jni) map c++ block to java vector table and support oracle insert
    
    2.0.2
---
 be/src/common/config.h                             |   2 +
 be/src/exec/odbc_connector.h                       |   6 +
 be/src/exec/table_connector.cpp                    |   8 +-
 be/src/exec/table_connector.h                      |   4 +
 be/src/vec/exec/jni_connector.cpp                  |  78 +++++++++++
 be/src/vec/exec/jni_connector.h                    |  19 +++
 be/src/vec/exec/vjdbc_connector.cpp                |  59 +++++++++
 be/src/vec/exec/vjdbc_connector.h                  |   4 +
 be/src/vec/sink/vjdbc_table_sink.cpp               |   1 +
 build-for-release.sh                               |   2 +-
 .../java/org/apache/doris/catalog/JdbcTable.java   |  14 ++
 .../org/apache/doris/planner/JdbcTableSink.java    |   3 +
 .../org/apache/doris/jni/vec/VectorColumn.java     |  67 +++++++++-
 .../java/org/apache/doris/jni/vec/VectorTable.java |  55 +++++++-
 .../java/org/apache/doris/udf/JdbcExecutor.java    | 142 ++++++++++++++++++++-
 .../java/org/apache/doris/jni/JniScannerTest.java  |   9 +-
 gensrc/script/gen_build_version.sh                 |   2 +-
 gensrc/thrift/DataSinks.thrift                     |   1 +
 18 files changed, 461 insertions(+), 15 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 63d785b41c..e65c052014 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -936,6 +936,8 @@ CONF_mInt64(max_tablet_io_errors, "-1");
 // Page size of row column, default 4KB
 CONF_mInt64(row_column_page_size, "4096");
 
+CONF_mBool(enable_new_oracle_insert, "true");
+
 #ifdef BE_TEST
 // test s3
 CONF_String(test_s3_resource, "resource");
diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h
index f7765a9fce..bd76c9dd9d 100644
--- a/be/src/exec/odbc_connector.h
+++ b/be/src/exec/odbc_connector.h
@@ -60,6 +60,12 @@ public:
     Status exec_write_sql(const std::u16string& insert_stmt,
                           const fmt::memory_buffer& insert_stmt_buffer) 
override;
 
+    Status exec_stmt_write(
+            vectorized::Block* block,
+            const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs) 
override {
+        return Status::OK();
+    }
+
     // use in ODBC transaction
     Status begin_trans() override; // should be call after connect and before 
query or init_to_write
     Status abort_trans() override; // should be call after transaction abort
diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index 11e60d76cf..7855c0badf 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -22,6 +22,7 @@
 #include <glog/logging.h>
 #include <iconv.h>
 
+#include "common/config.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/primitive_type.h"
 #include "util/mysql_global.h"
@@ -134,7 +135,12 @@ Status TableConnector::append(const std::string& 
table_name, vectorized::Block*
         insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(),
                                         _insert_stmt_buffer.data() + 
_insert_stmt_buffer.size());
     }
-    RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer));
+
+    if (config::enable_new_oracle_insert && table_type == 
TOdbcTableType::ORACLE) {
+        RETURN_IF_ERROR(exec_stmt_write(block, output_vexpr_ctxs));
+    } else {
+        RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer));
+    }
     COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
     return Status::OK();
 }
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 872aa7fe14..0ecb8462ea 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -46,6 +46,10 @@ public:
     virtual Status abort_trans() = 0;  // should be call after transaction 
abort
     virtual Status finish_trans() = 0; // should be call after transaction 
commit
 
+    virtual Status exec_stmt_write(
+            vectorized::Block* block,
+            const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs) = 
0;
+
     virtual Status exec_write_sql(const std::u16string& insert_stmt,
                                   const fmt::memory_buffer& 
_insert_stmt_buffer) = 0;
 
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index fc94debe84..80c4f4eb8a 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -329,4 +329,82 @@ std::string JniConnector::get_hive_type(const 
TypeDescriptor& desc) {
         return "unsupported";
     }
 }
+
+Status JniConnector::generate_meta_info(Block* block, std::unique_ptr<long[]>& 
meta) {
+    std::vector<long> meta_data;
+    // insert number of rows
+    meta_data.emplace_back(block->rows());
+    for (int i = 0; i < block->columns(); ++i) {
+        auto& column_with_type_and_name = block->get_by_position(i);
+        auto& column_ptr = column_with_type_and_name.column;
+        auto& column_type = column_with_type_and_name.type;
+        TypeIndex logical_type = remove_nullable(column_type)->get_type_id();
+
+        // insert null map address
+        MutableColumnPtr data_column;
+        if (column_ptr->is_nullable()) {
+            auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(
+                    column_ptr->assume_mutable().get());
+            data_column = nullable_column->get_nested_column_ptr();
+            NullMap& null_map = nullable_column->get_null_map_data();
+            meta_data.emplace_back((long)null_map.data());
+        } else {
+            meta_data.emplace_back(0);
+            data_column = column_ptr->assume_mutable();
+        }
+
+        switch (logical_type) {
+#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE)                               
           \
+    case NUMERIC_TYPE: {                                                       
           \
+        
meta_data.emplace_back(_get_numeric_data_address<CPP_NUMERIC_TYPE>(data_column));
 \
+        break;                                                                 
           \
+    }
+            FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
+#undef DISPATCH
+        case TypeIndex::Decimal128:
+            [[fallthrough]];
+        case TypeIndex::Decimal128I: {
+            
meta_data.emplace_back(_get_decimal_data_address<Int128>(data_column));
+            break;
+        }
+        case TypeIndex::Decimal32: {
+            
meta_data.emplace_back(_get_decimal_data_address<Int32>(data_column));
+            break;
+        }
+        case TypeIndex::Decimal64: {
+            
meta_data.emplace_back(_get_decimal_data_address<Int64>(data_column));
+            break;
+        }
+        case TypeIndex::DateV2: {
+            
meta_data.emplace_back(_get_time_data_address<UInt32>(data_column));
+            break;
+        }
+        case TypeIndex::DateTimeV2: {
+            
meta_data.emplace_back(_get_time_data_address<UInt64>(data_column));
+            break;
+        }
+        case TypeIndex::String:
+            [[fallthrough]];
+        case TypeIndex::FixedString: {
+            auto& string_column = static_cast<ColumnString&>(*data_column);
+            // inert offsets
+            meta_data.emplace_back((long)string_column.get_offsets().data());
+            meta_data.emplace_back((long)string_column.get_chars().data());
+            break;
+        }
+        case TypeIndex::Array:
+            [[fallthrough]];
+        case TypeIndex::Struct:
+            [[fallthrough]];
+        case TypeIndex::Map:
+            return Status::IOError("Unhandled type {}", 
getTypeName(logical_type));
+        default:
+            return Status::IOError("Unsupported type {}", 
getTypeName(logical_type));
+        }
+    }
+
+    meta.reset(new long[meta_data.size()]);
+    memcpy(meta.get(), &meta_data[0], meta_data.size() * 8);
+    return Status::OK();
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index bc3157ae46..7e74ecd6e3 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -177,6 +177,8 @@ public:
      */
     static std::string get_hive_type(const TypeDescriptor& desc);
 
+    static Status generate_meta_info(Block* block, std::unique_ptr<long[]>& 
meta);
+
 private:
     std::string _connector_class;
     std::map<std::string, std::string> _scanner_params;
@@ -233,6 +235,11 @@ private:
         return Status::OK();
     }
 
+    template <typename CppType>
+    static long _get_numeric_data_address(MutableColumnPtr& doris_column) {
+        return 
(long)static_cast<ColumnVector<CppType>&>(*doris_column).get_data().data();
+    }
+
     template <typename DecimalPrimitiveType>
     Status _fill_decimal_column(MutableColumnPtr& doris_column, 
DecimalPrimitiveType* ptr,
                                 size_t num_rows) {
@@ -245,6 +252,13 @@ private:
         return Status::OK();
     }
 
+    template <typename DecimalPrimitiveType>
+    static long _get_decimal_data_address(MutableColumnPtr& doris_column) {
+        return 
(long)static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column)
+                .get_data()
+                .data();
+    }
+
     template <typename CppType>
     Status _decode_time_column(MutableColumnPtr& doris_column, CppType* ptr, 
size_t num_rows) {
         auto& column_data = 
static_cast<ColumnVector<CppType>&>(*doris_column).get_data();
@@ -254,6 +268,11 @@ private:
         return Status::OK();
     }
 
+    template <typename CppType>
+    static long _get_time_data_address(MutableColumnPtr& doris_column) {
+        return 
(long)static_cast<ColumnVector<CppType>&>(*doris_column).get_data().data();
+    }
+
     Status _fill_string_column(MutableColumnPtr& doris_column, size_t 
num_rows);
 
     void _generate_predicates(
diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index 572be9e733..e64db5e385 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -33,6 +33,7 @@
 #include "vec/columns/column_string.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_string.h"
+#include "vec/exec/jni_connector.h"
 #include "vec/exec/scan/new_jdbc_scanner.h"
 #include "vec/functions/simple_function_factory.h"
 
@@ -41,6 +42,7 @@ namespace vectorized {
 const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor";
 const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
 const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I";
+const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I";
 const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
 const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
 const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;";
@@ -586,6 +588,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
                                 _executor_ctor_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "write", 
JDBC_EXECUTOR_WRITE_SIGNATURE,
                                 _executor_write_id));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "write", 
JDBC_EXECUTOR_STMT_WRITE_SIGNATURE,
+                                _executor_stmt_write_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", 
_executor_read_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "close", 
JDBC_EXECUTOR_CLOSE_SIGNATURE,
                                 _executor_close_id));
@@ -914,6 +918,61 @@ Status JdbcConnector::exec_write_sql(const std::u16string& 
insert_stmt,
     return Status::OK();
 }
 
+Status JdbcConnector::exec_stmt_write(
+        Block* block, const std::vector<vectorized::VExprContext*>& 
output_vexpr_ctxs) {
+    SCOPED_TIMER(_result_send_timer);
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+
+    // prepare table schema
+    std::ostringstream required_fields;
+    std::ostringstream columns_types;
+    for (int i = 0; i < block->columns(); ++i) {
+        // column name maybe empty or has special characters
+        // std::string field = block->get_by_position(i).name;
+        std::string type = 
JniConnector::get_hive_type(output_vexpr_ctxs[i]->root()->type());
+        if (i == 0) {
+            required_fields << "_col" << i;
+            columns_types << type;
+        } else {
+            required_fields << ","
+                            << "_col" << i;
+            columns_types << "#" << type;
+        }
+    }
+
+    // prepare table meta information
+    std::unique_ptr<long[]> meta_data;
+    RETURN_IF_ERROR(JniConnector::generate_meta_info(block, meta_data));
+    long meta_address = (long)meta_data.get();
+
+    // prepare constructor parameters
+    std::map<String, String> write_params = {{"meta_address", 
std::to_string(meta_address)},
+                                             {"required_fields", 
required_fields.str()},
+                                             {"columns_types", 
columns_types.str()},
+                                             {"write_sql", "/* todo */"}};
+    jclass hashmap_class = env->FindClass("java/util/HashMap");
+    jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "<init>", 
"(I)V");
+    jobject hashmap_object =
+            env->NewObject(hashmap_class, hashmap_constructor, 
write_params.size());
+    jmethodID hashmap_put = env->GetMethodID(
+            hashmap_class, "put", 
"(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;");
+    RETURN_ERROR_IF_EXC(env);
+    for (const auto& it : write_params) {
+        jstring key = env->NewStringUTF(it.first.c_str());
+        jstring value = env->NewStringUTF(it.second.c_str());
+        env->CallObjectMethod(hashmap_object, hashmap_put, key, value);
+        env->DeleteLocalRef(key);
+        env->DeleteLocalRef(value);
+    }
+    env->DeleteLocalRef(hashmap_class);
+    env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, 
_executor_stmt_write_id,
+                                 hashmap_object);
+    env->DeleteLocalRef(hashmap_object);
+    RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
+    return Status::OK();
+}
+
 std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) {
     jobject jstr = env->CallObjectMethod(jobj, _to_string_id);
     auto coding = env->NewStringUTF("UTF-8");
diff --git a/be/src/vec/exec/vjdbc_connector.h 
b/be/src/vec/exec/vjdbc_connector.h
index fecedbb6fc..b5725988f9 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -67,6 +67,9 @@ public:
     Status exec_write_sql(const std::u16string& insert_stmt,
                           const fmt::memory_buffer& insert_stmt_buffer) 
override;
 
+    Status exec_stmt_write(
+            Block* block, const std::vector<vectorized::VExprContext*>& 
output_vexpr_ctxs) override;
+
     Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, Block* 
block,
                     int batch_size);
 
@@ -115,6 +118,7 @@ private:
     jobject _executor_obj;
     jmethodID _executor_ctor_id;
     jmethodID _executor_write_id;
+    jmethodID _executor_stmt_write_id;
     jmethodID _executor_read_id;
     jmethodID _executor_has_next_id;
     jmethodID _executor_block_rows_id;
diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp 
b/be/src/vec/sink/vjdbc_table_sink.cpp
index 973342da73..a86f480a1e 100644
--- a/be/src/vec/sink/vjdbc_table_sink.cpp
+++ b/be/src/vec/sink/vjdbc_table_sink.cpp
@@ -45,6 +45,7 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) {
     _jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
     _jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
     _jdbc_param.table_type = t_jdbc_sink.table_type;
+    _jdbc_param.query_string = t_jdbc_sink.insert_sql;
     _table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
     _use_transaction = t_jdbc_sink.use_transaction;
 
diff --git a/build-for-release.sh b/build-for-release.sh
index 869f9a6c9e..39ebd83a45 100755
--- a/build-for-release.sh
+++ b/build-for-release.sh
@@ -109,7 +109,7 @@ echo "Get params:
     TAR             -- ${TAR}
 "
 
-sh build.sh --clean &&
+#sh build.sh --clean &&
     USE_AVX2="${_USE_AVX2}" sh build.sh &&
     USE_AVX2="${_USE_AVX2}" sh build.sh --be --meta-tool
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
index c9a36c0439..e857cc5fc8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
@@ -92,6 +92,20 @@ public class JdbcTable extends Table {
         super(id, name, type, schema);
     }
 
+    public String getInsertSql() {
+        StringBuilder sb = new StringBuilder("INSERT INTO ");
+        
sb.append(OdbcTable.databaseProperName(TABLE_TYPE_MAP.get(getTableTypeName()), 
getExternalTableName()));
+        sb.append(" VALUES (");
+        for (int i = 0; i < getFullSchema().size(); ++i) {
+            if (i != 0) {
+                sb.append(", ");
+            }
+            sb.append("?");
+        }
+        sb.append(")");
+        return sb.toString();
+    }
+
     public String getCheckSum() {
         return checkSum;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java
index 5135145447..73b981b19a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java
@@ -44,6 +44,7 @@ public class JdbcTableSink extends DataSink {
     private final String checkSum;
     private final TOdbcTableType jdbcType;
     private final boolean useTransaction;
+    private String insertSql;
 
     public JdbcTableSink(JdbcTable jdbcTable) {
         resourceName = jdbcTable.getResourceName();
@@ -57,6 +58,7 @@ public class JdbcTableSink extends DataSink {
         driverUrl = jdbcTable.getDriverUrl();
         checkSum = jdbcTable.getCheckSum();
         dorisTableName = jdbcTable.getName();
+        insertSql = jdbcTable.getInsertSql();
     }
 
     @Override
@@ -84,6 +86,7 @@ public class JdbcTableSink extends DataSink {
         jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass);
         jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum);
         jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName);
+        jdbcTableSink.setInsertSql(insertSql);
         jdbcTableSink.setUseTransaction(useTransaction);
         jdbcTableSink.setTableType(jdbcType);
 
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
index 80859f809e..78ce7e1cbd 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
@@ -72,12 +72,61 @@ public class VectorColumn {
             }
         } else if (columnType.isStringType()) {
             childColumns = new VectorColumn[1];
-            childColumns[0] = new VectorColumn(new ColumnType("#data", 
Type.BYTE), capacity * DEFAULT_STRING_LENGTH);
+            childColumns[0] = new VectorColumn(new ColumnType("#stringBytes", 
Type.BYTE),
+                    capacity * DEFAULT_STRING_LENGTH);
         }
 
         reserveCapacity(capacity);
     }
 
+    // restore the child of string column & restore meta column
+    public VectorColumn(long address, int capacity, ColumnType columnType) {
+        this.columnType = columnType;
+        this.capacity = capacity;
+        this.nullMap = 0;
+        this.data = address;
+        this.offsets = 0;
+        this.numNulls = 0;
+        this.appendIndex = capacity;
+    }
+
+    // restore block column
+    public VectorColumn(ColumnType columnType, int numRows, long 
columnMetaAddress) {
+        if (columnType.isUnsupported()) {
+            throw new RuntimeException("Unsupported type for column: " + 
columnType.getName());
+        }
+        long address = columnMetaAddress;
+        this.capacity = numRows;
+        this.columnType = columnType;
+        this.nullMap = OffHeap.getLong(null, address);
+        address += 8;
+        this.numNulls = 0;
+        if (this.nullMap != 0) {
+            for (int i = 0; i < numRows; ++i) {
+                if (isNullAt(i)) {
+                    this.numNulls++;
+                }
+            }
+        }
+        this.appendIndex = numRows;
+
+        if (columnType.isComplexType()) {
+            // todo: support complex type
+            throw new RuntimeException("Unhandled type: " + columnType);
+        } else if (columnType.isStringType()) {
+            this.offsets = OffHeap.getLong(null, address);
+            address += 8;
+            this.data = 0;
+            int length = OffHeap.getInt(null, this.offsets + (numRows - 1) * 
4L);
+            childColumns = new VectorColumn[1];
+            childColumns[0] = new VectorColumn(OffHeap.getLong(null, address), 
length,
+                    new ColumnType("#stringBytes", Type.BYTE));
+        } else {
+            this.data = OffHeap.getLong(null, address);
+            this.offsets = 0;
+        }
+    }
+
     public long nullMapAddress() {
         return nullMap;
     }
@@ -90,6 +139,10 @@ public class VectorColumn {
         return offsets;
     }
 
+    public ColumnType.Type getColumnTyp() {
+        return columnType.getType();
+    }
+
     /**
      * Release columns and meta information
      */
@@ -159,8 +212,10 @@ public class VectorColumn {
             throw new RuntimeException("Unhandled type: " + columnType);
         }
         // todo: support complex type
-        this.nullMap = OffHeap.reallocateMemory(nullMap, oldCapacity, 
newCapacity);
-        OffHeap.setMemory(nullMap + oldCapacity, (byte) 0, newCapacity - 
oldCapacity);
+        if (!"#stringBytes".equals(columnType.getName())) {
+            this.nullMap = OffHeap.reallocateMemory(nullMap, oldCapacity, 
newCapacity);
+            OffHeap.setMemory(nullMap + oldCapacity, (byte) 0, newCapacity - 
oldCapacity);
+        }
         capacity = newCapacity;
     }
 
@@ -178,7 +233,11 @@ public class VectorColumn {
     }
 
     public boolean isNullAt(int rowId) {
-        return OffHeap.getByte(null, nullMap + rowId) == 1;
+        if (nullMap == 0) {
+            return false;
+        } else {
+            return OffHeap.getByte(null, nullMap + rowId) == 1;
+        }
     }
 
     public boolean hasNull() {
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorTable.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorTable.java
index 3c330bf796..3f86ebc7a2 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorTable.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.jni.vec;
 
+import org.apache.doris.jni.utils.OffHeap;
 import org.apache.doris.jni.vec.ColumnType.Type;
 
 /**
@@ -24,12 +25,16 @@ import org.apache.doris.jni.vec.ColumnType.Type;
  */
 public class VectorTable {
     private final VectorColumn[] columns;
+    private final ColumnType[] columnTypes;
     private final String[] fields;
     private final ScanPredicate[] predicates;
     private final VectorColumn meta;
     private int numRows;
 
+    private final boolean isRestoreTable;
+
     public VectorTable(ColumnType[] types, String[] fields, ScanPredicate[] 
predicates, int capacity) {
+        this.columnTypes = types;
         this.fields = fields;
         this.columns = new VectorColumn[types.length];
         this.predicates = predicates;
@@ -40,13 +45,51 @@ public class VectorTable {
         }
         this.meta = new VectorColumn(new ColumnType("#meta", Type.BIGINT), 
metaSize);
         this.numRows = 0;
+        this.isRestoreTable = false;
+    }
+
+    public VectorTable(ColumnType[] types, String[] fields, long metaAddress) {
+        long address = metaAddress;
+        this.columnTypes = types;
+        this.fields = fields;
+        this.columns = new VectorColumn[types.length];
+        this.predicates = new ScanPredicate[0];
+
+        this.numRows = (int) OffHeap.getLong(null, address);
+        address += 8;
+        int metaSize = 1; // number of rows
+        for (int i = 0; i < types.length; i++) {
+            columns[i] = new VectorColumn(types[i], numRows, address);
+            metaSize += types[i].metaSize();
+            address += types[i].metaSize() * 8L;
+        }
+        this.meta = new VectorColumn(metaAddress, metaSize, new 
ColumnType("#meta", Type.BIGINT));
+        this.isRestoreTable = true;
     }
 
     public void appendData(int fieldId, ColumnValue o) {
+        assert (!isRestoreTable);
         columns[fieldId].appendValue(o);
     }
 
+    public VectorColumn[] getColumns() {
+        return columns;
+    }
+
+    public VectorColumn getColumn(int fieldId) {
+        return columns[fieldId];
+    }
+
+    public ColumnType[] getColumnTypes() {
+        return columnTypes;
+    }
+
+    public String[] getFields() {
+        return fields;
+    }
+
     public void releaseColumn(int fieldId) {
+        assert (!isRestoreTable);
         columns[fieldId].close();
     }
 
@@ -59,15 +102,18 @@ public class VectorTable {
     }
 
     public long getMetaAddress() {
-        meta.reset();
-        meta.appendLong(numRows);
-        for (VectorColumn c : columns) {
-            c.updateMeta(meta);
+        if (!isRestoreTable) {
+            meta.reset();
+            meta.appendLong(numRows);
+            for (VectorColumn c : columns) {
+                c.updateMeta(meta);
+            }
         }
         return meta.dataAddress();
     }
 
     public void reset() {
+        assert (!isRestoreTable);
         for (VectorColumn column : columns) {
             column.reset();
         }
@@ -75,6 +121,7 @@ public class VectorTable {
     }
 
     public void close() {
+        assert (!isRestoreTable);
         for (int i = 0; i < columns.length; i++) {
             releaseColumn(i);
         }
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index 53a7e64cde..483fef72ea 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.udf;
 
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.VectorColumn;
+import org.apache.doris.jni.vec.VectorTable;
 import org.apache.doris.thrift.TJdbcExecutorCtorParams;
 import org.apache.doris.thrift.TJdbcOperation;
 import org.apache.doris.thrift.TOdbcTableType;
@@ -48,6 +51,8 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -62,6 +67,7 @@ public class JdbcExecutor {
     private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
     private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new 
TBinaryProtocol.Factory();
     private Connection conn = null;
+    private PreparedStatement preparedStatement = null;
     private Statement stmt = null;
     private ResultSet resultSet = null;
     private ResultSetMetaData resultSetMetaData = null;
@@ -125,6 +131,135 @@ public class JdbcExecutor {
         }
     }
 
+    public int write(Map<String, String> params) throws UdfRuntimeException {
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split("#");
+        long metaAddress = Long.parseLong(params.get("meta_address"));
+        // Get sql string from configuration map
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        VectorTable batchTable = new VectorTable(columnTypes, requiredFields, 
metaAddress);
+        // Can't release or close batchTable, it's released by c++
+        try {
+            insert(batchTable);
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+        }
+        return batchTable.getNumRows();
+    }
+
+    private int insert(VectorTable data) throws SQLException {
+        for (int i = 0; i < data.getNumRows(); ++i) {
+            for (int j = 0; j < data.getColumns().length; ++j) {
+                insertColumn(i, j, data.getColumns()[j]);
+            }
+            preparedStatement.addBatch();
+        }
+        preparedStatement.executeBatch();
+        preparedStatement.clearBatch();
+        return data.getNumRows();
+    }
+
+    private void insertColumn(int rowIdx, int colIdx, VectorColumn column) 
throws SQLException {
+        int parameterIndex = colIdx + 1;
+        ColumnType.Type dorisType = column.getColumnTyp();
+        if (column.isNullAt(rowIdx)) {
+            insertNullColumn(parameterIndex, dorisType);
+            return;
+        }
+        switch (dorisType) {
+            case BOOLEAN:
+                preparedStatement.setBoolean(parameterIndex, 
column.getBoolean(rowIdx));
+                break;
+            case TINYINT:
+                preparedStatement.setByte(parameterIndex, (byte) 
column.getInt(rowIdx));
+                break;
+            case SMALLINT:
+                preparedStatement.setShort(parameterIndex, (short) 
column.getInt(rowIdx));
+                break;
+            case INT:
+                preparedStatement.setInt(parameterIndex, 
column.getInt(rowIdx));
+                break;
+            case BIGINT:
+                preparedStatement.setLong(parameterIndex, 
column.getLong(rowIdx));
+                break;
+            case FLOAT:
+                preparedStatement.setFloat(parameterIndex, 
column.getFloat(rowIdx));
+                break;
+            case DOUBLE:
+                preparedStatement.setDouble(parameterIndex, 
column.getDouble(rowIdx));
+                break;
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                preparedStatement.setBigDecimal(parameterIndex, 
column.getDecimal(rowIdx));
+                break;
+            case DATEV2:
+                preparedStatement.setDate(parameterIndex, 
Date.valueOf(column.getDate(rowIdx)));
+                break;
+            case DATETIMEV2:
+                preparedStatement.setTimestamp(parameterIndex, 
Timestamp.valueOf(column.getDateTime(rowIdx)));
+                break;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+            case BINARY:
+                preparedStatement.setString(parameterIndex, 
column.getStringWithOffset(rowIdx));
+                break;
+            default:
+                throw new RuntimeException("Unknown type value: " + dorisType);
+        }
+    }
+
+    private void insertNullColumn(int parameterIndex, ColumnType.Type 
dorisType) throws SQLException {
+        switch (dorisType) {
+            case BOOLEAN:
+                preparedStatement.setNull(parameterIndex, Types.BOOLEAN);
+                break;
+            case TINYINT:
+                preparedStatement.setNull(parameterIndex, Types.TINYINT);
+                break;
+            case SMALLINT:
+                preparedStatement.setNull(parameterIndex, Types.SMALLINT);
+                break;
+            case INT:
+                preparedStatement.setNull(parameterIndex, Types.INTEGER);
+                break;
+            case BIGINT:
+                preparedStatement.setNull(parameterIndex, Types.BIGINT);
+                break;
+            case FLOAT:
+                preparedStatement.setNull(parameterIndex, Types.FLOAT);
+                break;
+            case DOUBLE:
+                preparedStatement.setNull(parameterIndex, Types.DOUBLE);
+                break;
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                preparedStatement.setNull(parameterIndex, Types.DECIMAL);
+                break;
+            case DATEV2:
+                preparedStatement.setNull(parameterIndex, Types.DATE);
+                break;
+            case DATETIMEV2:
+                preparedStatement.setNull(parameterIndex, Types.TIMESTAMP);
+                break;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+            case BINARY:
+                preparedStatement.setNull(parameterIndex, Types.VARCHAR);
+                break;
+            default:
+                throw new RuntimeException("Unknown type value: " + dorisType);
+        }
+    }
+
     public List<String> getResultColumnTypeNames() {
         return resultColumnTypeNames;
     }
@@ -272,7 +407,12 @@ public class JdbcExecutor {
                 }
                 batchSizeNum = batchSize;
             } else {
-                stmt = conn.createStatement();
+                if (tableType == TOdbcTableType.ORACLE) {
+                    LOG.info("insert sql: " + sql);
+                    preparedStatement = conn.prepareStatement(sql);
+                } else {
+                    stmt = conn.createStatement();
+                }
             }
         } catch (MalformedURLException e) {
             throw new UdfRuntimeException("MalformedURLException to load class 
about " + driverUrl, e);
diff --git a/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java 
b/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java
index 5d6a0dfc68..32f4cccd79 100644
--- a/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java
+++ b/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.jni;
 
 import org.apache.doris.jni.utils.OffHeap;
+import org.apache.doris.jni.vec.VectorTable;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,20 +39,22 @@ public class JniScannerTest {
                         + 
"date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)");
             }
         });
-        StringBuilder result = new StringBuilder();
         scanner.open();
         long metaAddress = 0;
         do {
             metaAddress = scanner.getNextBatchMeta();
             if (metaAddress != 0) {
-                result.append(scanner.getTable().dump(32));
                 long rows = OffHeap.getLong(null, metaAddress);
                 Assert.assertEquals(32, rows);
+
+                VectorTable restoreTable = new 
VectorTable(scanner.getTable().getColumnTypes(),
+                        scanner.getTable().getFields(), metaAddress);
+                System.out.println(restoreTable.dump((int) rows));
+                // Restored table is release by the origin table.
             }
             scanner.resetTable();
         } while (metaAddress != 0);
         scanner.releaseTable();
         scanner.close();
-        System.out.print(result);
     }
 }
diff --git a/gensrc/script/gen_build_version.sh 
b/gensrc/script/gen_build_version.sh
index de749bfb97..e77e5cb931 100755
--- a/gensrc/script/gen_build_version.sh
+++ b/gensrc/script/gen_build_version.sh
@@ -30,7 +30,7 @@ set -eo pipefail
 build_version_prefix="doris"
 build_version_major=2
 build_version_minor=0
-build_version_patch=1
+build_version_patch=2
 build_version_rc_version="cicc"
 
 
build_version="${build_version_prefix}-${build_version_major}.${build_version_minor}.${build_version_patch}-${build_version_rc_version}"
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 6a5a6e1944..04f3ba3e60 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -156,6 +156,7 @@ struct TJdbcTableSink {
     1: optional Descriptors.TJdbcTable jdbc_table
     2: optional bool use_transaction
     3: optional Types.TOdbcTableType table_type
+    4: optional string insert_sql
 }
 
 struct TExportSink {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to