This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch doris-for-zhongjin in repository https://gitbox.apache.org/repos/asf/doris.git
commit b53ccafb8998556a85818f5074b23993572d811d Author: gaoxin <ashin...@outlook.com> AuthorDate: Tue Apr 11 19:51:26 2023 +0800 [feature](jni) map c++ block to java vector table and support oracle insert 2.0.2 --- be/src/common/config.h | 2 + be/src/exec/odbc_connector.h | 6 + be/src/exec/table_connector.cpp | 8 +- be/src/exec/table_connector.h | 4 + be/src/vec/exec/jni_connector.cpp | 78 +++++++++++ be/src/vec/exec/jni_connector.h | 19 +++ be/src/vec/exec/vjdbc_connector.cpp | 59 +++++++++ be/src/vec/exec/vjdbc_connector.h | 4 + be/src/vec/sink/vjdbc_table_sink.cpp | 1 + build-for-release.sh | 2 +- .../java/org/apache/doris/catalog/JdbcTable.java | 14 ++ .../org/apache/doris/planner/JdbcTableSink.java | 3 + .../org/apache/doris/jni/vec/VectorColumn.java | 67 +++++++++- .../java/org/apache/doris/jni/vec/VectorTable.java | 55 +++++++- .../java/org/apache/doris/udf/JdbcExecutor.java | 142 ++++++++++++++++++++- .../java/org/apache/doris/jni/JniScannerTest.java | 9 +- gensrc/script/gen_build_version.sh | 2 +- gensrc/thrift/DataSinks.thrift | 1 + 18 files changed, 461 insertions(+), 15 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 63d785b41c..e65c052014 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -936,6 +936,8 @@ CONF_mInt64(max_tablet_io_errors, "-1"); // Page size of row column, default 4KB CONF_mInt64(row_column_page_size, "4096"); +CONF_mBool(enable_new_oracle_insert, "true"); + #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h index f7765a9fce..bd76c9dd9d 100644 --- a/be/src/exec/odbc_connector.h +++ b/be/src/exec/odbc_connector.h @@ -60,6 +60,12 @@ public: Status exec_write_sql(const std::u16string& insert_stmt, const fmt::memory_buffer& insert_stmt_buffer) override; + Status exec_stmt_write( + vectorized::Block* block, + const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs) override { + return Status::OK(); + } + // use in ODBC transaction Status begin_trans() override; // should be call after connect and before query or init_to_write Status abort_trans() override; // should be call after transaction abort diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp index 11e60d76cf..7855c0badf 100644 --- a/be/src/exec/table_connector.cpp +++ b/be/src/exec/table_connector.cpp @@ -22,6 +22,7 @@ #include <glog/logging.h> #include <iconv.h> +#include "common/config.h" #include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" #include "util/mysql_global.h" @@ -134,7 +135,12 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block* insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(), _insert_stmt_buffer.data() + _insert_stmt_buffer.size()); } - RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer)); + + if (config::enable_new_oracle_insert && table_type == TOdbcTableType::ORACLE) { + RETURN_IF_ERROR(exec_stmt_write(block, output_vexpr_ctxs)); + } else { + RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer)); + } COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent); return Status::OK(); } diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h index 872aa7fe14..0ecb8462ea 100644 --- a/be/src/exec/table_connector.h +++ b/be/src/exec/table_connector.h @@ -46,6 +46,10 @@ public: virtual Status abort_trans() = 0; // should be call after transaction abort virtual Status finish_trans() = 0; // should be call after transaction commit + virtual Status exec_stmt_write( + vectorized::Block* block, + const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs) = 0; + virtual Status exec_write_sql(const std::u16string& insert_stmt, const fmt::memory_buffer& _insert_stmt_buffer) = 0; diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index fc94debe84..80c4f4eb8a 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -329,4 +329,82 @@ std::string JniConnector::get_hive_type(const TypeDescriptor& desc) { return "unsupported"; } } + +Status JniConnector::generate_meta_info(Block* block, std::unique_ptr<long[]>& meta) { + std::vector<long> meta_data; + // insert number of rows + meta_data.emplace_back(block->rows()); + for (int i = 0; i < block->columns(); ++i) { + auto& column_with_type_and_name = block->get_by_position(i); + auto& column_ptr = column_with_type_and_name.column; + auto& column_type = column_with_type_and_name.type; + TypeIndex logical_type = remove_nullable(column_type)->get_type_id(); + + // insert null map address + MutableColumnPtr data_column; + if (column_ptr->is_nullable()) { + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + column_ptr->assume_mutable().get()); + data_column = nullable_column->get_nested_column_ptr(); + NullMap& null_map = nullable_column->get_null_map_data(); + meta_data.emplace_back((long)null_map.data()); + } else { + meta_data.emplace_back(0); + data_column = column_ptr->assume_mutable(); + } + + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ + case NUMERIC_TYPE: { \ + meta_data.emplace_back(_get_numeric_data_address<CPP_NUMERIC_TYPE>(data_column)); \ + break; \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Decimal128: + [[fallthrough]]; + case TypeIndex::Decimal128I: { + meta_data.emplace_back(_get_decimal_data_address<Int128>(data_column)); + break; + } + case TypeIndex::Decimal32: { + meta_data.emplace_back(_get_decimal_data_address<Int32>(data_column)); + break; + } + case TypeIndex::Decimal64: { + meta_data.emplace_back(_get_decimal_data_address<Int64>(data_column)); + break; + } + case TypeIndex::DateV2: { + meta_data.emplace_back(_get_time_data_address<UInt32>(data_column)); + break; + } + case TypeIndex::DateTimeV2: { + meta_data.emplace_back(_get_time_data_address<UInt64>(data_column)); + break; + } + case TypeIndex::String: + [[fallthrough]]; + case TypeIndex::FixedString: { + auto& string_column = static_cast<ColumnString&>(*data_column); + // inert offsets + meta_data.emplace_back((long)string_column.get_offsets().data()); + meta_data.emplace_back((long)string_column.get_chars().data()); + break; + } + case TypeIndex::Array: + [[fallthrough]]; + case TypeIndex::Struct: + [[fallthrough]]; + case TypeIndex::Map: + return Status::IOError("Unhandled type {}", getTypeName(logical_type)); + default: + return Status::IOError("Unsupported type {}", getTypeName(logical_type)); + } + } + + meta.reset(new long[meta_data.size()]); + memcpy(meta.get(), &meta_data[0], meta_data.size() * 8); + return Status::OK(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index bc3157ae46..7e74ecd6e3 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -177,6 +177,8 @@ public: */ static std::string get_hive_type(const TypeDescriptor& desc); + static Status generate_meta_info(Block* block, std::unique_ptr<long[]>& meta); + private: std::string _connector_class; std::map<std::string, std::string> _scanner_params; @@ -233,6 +235,11 @@ private: return Status::OK(); } + template <typename CppType> + static long _get_numeric_data_address(MutableColumnPtr& doris_column) { + return (long)static_cast<ColumnVector<CppType>&>(*doris_column).get_data().data(); + } + template <typename DecimalPrimitiveType> Status _fill_decimal_column(MutableColumnPtr& doris_column, DecimalPrimitiveType* ptr, size_t num_rows) { @@ -245,6 +252,13 @@ private: return Status::OK(); } + template <typename DecimalPrimitiveType> + static long _get_decimal_data_address(MutableColumnPtr& doris_column) { + return (long)static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column) + .get_data() + .data(); + } + template <typename CppType> Status _decode_time_column(MutableColumnPtr& doris_column, CppType* ptr, size_t num_rows) { auto& column_data = static_cast<ColumnVector<CppType>&>(*doris_column).get_data(); @@ -254,6 +268,11 @@ private: return Status::OK(); } + template <typename CppType> + static long _get_time_data_address(MutableColumnPtr& doris_column) { + return (long)static_cast<ColumnVector<CppType>&>(*doris_column).get_data().data(); + } + Status _fill_string_column(MutableColumnPtr& doris_column, size_t num_rows); void _generate_predicates( diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 572be9e733..e64db5e385 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -33,6 +33,7 @@ #include "vec/columns/column_string.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_string.h" +#include "vec/exec/jni_connector.h" #include "vec/exec/scan/new_jdbc_scanner.h" #include "vec/functions/simple_function_factory.h" @@ -41,6 +42,7 @@ namespace vectorized { const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor"; const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V"; const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I"; +const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I"; const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;"; const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;"; @@ -586,6 +588,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { _executor_ctor_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_WRITE_SIGNATURE, _executor_write_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_STMT_WRITE_SIGNATURE, + _executor_stmt_write_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE, _executor_close_id)); @@ -914,6 +918,61 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt, return Status::OK(); } +Status JdbcConnector::exec_stmt_write( + Block* block, const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs) { + SCOPED_TIMER(_result_send_timer); + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + + // prepare table schema + std::ostringstream required_fields; + std::ostringstream columns_types; + for (int i = 0; i < block->columns(); ++i) { + // column name maybe empty or has special characters + // std::string field = block->get_by_position(i).name; + std::string type = JniConnector::get_hive_type(output_vexpr_ctxs[i]->root()->type()); + if (i == 0) { + required_fields << "_col" << i; + columns_types << type; + } else { + required_fields << "," + << "_col" << i; + columns_types << "#" << type; + } + } + + // prepare table meta information + std::unique_ptr<long[]> meta_data; + RETURN_IF_ERROR(JniConnector::generate_meta_info(block, meta_data)); + long meta_address = (long)meta_data.get(); + + // prepare constructor parameters + std::map<String, String> write_params = {{"meta_address", std::to_string(meta_address)}, + {"required_fields", required_fields.str()}, + {"columns_types", columns_types.str()}, + {"write_sql", "/* todo */"}}; + jclass hashmap_class = env->FindClass("java/util/HashMap"); + jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "<init>", "(I)V"); + jobject hashmap_object = + env->NewObject(hashmap_class, hashmap_constructor, write_params.size()); + jmethodID hashmap_put = env->GetMethodID( + hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;"); + RETURN_ERROR_IF_EXC(env); + for (const auto& it : write_params) { + jstring key = env->NewStringUTF(it.first.c_str()); + jstring value = env->NewStringUTF(it.second.c_str()); + env->CallObjectMethod(hashmap_object, hashmap_put, key, value); + env->DeleteLocalRef(key); + env->DeleteLocalRef(value); + } + env->DeleteLocalRef(hashmap_class); + env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_stmt_write_id, + hashmap_object); + env->DeleteLocalRef(hashmap_object); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + return Status::OK(); +} + std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) { jobject jstr = env->CallObjectMethod(jobj, _to_string_id); auto coding = env->NewStringUTF("UTF-8"); diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index fecedbb6fc..b5725988f9 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -67,6 +67,9 @@ public: Status exec_write_sql(const std::u16string& insert_stmt, const fmt::memory_buffer& insert_stmt_buffer) override; + Status exec_stmt_write( + Block* block, const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs) override; + Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, Block* block, int batch_size); @@ -115,6 +118,7 @@ private: jobject _executor_obj; jmethodID _executor_ctor_id; jmethodID _executor_write_id; + jmethodID _executor_stmt_write_id; jmethodID _executor_read_id; jmethodID _executor_has_next_id; jmethodID _executor_block_rows_id; diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp index 973342da73..a86f480a1e 100644 --- a/be/src/vec/sink/vjdbc_table_sink.cpp +++ b/be/src/vec/sink/vjdbc_table_sink.cpp @@ -45,6 +45,7 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) { _jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum; _jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name; _jdbc_param.table_type = t_jdbc_sink.table_type; + _jdbc_param.query_string = t_jdbc_sink.insert_sql; _table_name = t_jdbc_sink.jdbc_table.jdbc_table_name; _use_transaction = t_jdbc_sink.use_transaction; diff --git a/build-for-release.sh b/build-for-release.sh index 869f9a6c9e..39ebd83a45 100755 --- a/build-for-release.sh +++ b/build-for-release.sh @@ -109,7 +109,7 @@ echo "Get params: TAR -- ${TAR} " -sh build.sh --clean && +#sh build.sh --clean && USE_AVX2="${_USE_AVX2}" sh build.sh && USE_AVX2="${_USE_AVX2}" sh build.sh --be --meta-tool diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index c9a36c0439..e857cc5fc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -92,6 +92,20 @@ public class JdbcTable extends Table { super(id, name, type, schema); } + public String getInsertSql() { + StringBuilder sb = new StringBuilder("INSERT INTO "); + sb.append(OdbcTable.databaseProperName(TABLE_TYPE_MAP.get(getTableTypeName()), getExternalTableName())); + sb.append(" VALUES ("); + for (int i = 0; i < getFullSchema().size(); ++i) { + if (i != 0) { + sb.append(", "); + } + sb.append("?"); + } + sb.append(")"); + return sb.toString(); + } + public String getCheckSum() { return checkSum; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java index 5135145447..73b981b19a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java @@ -44,6 +44,7 @@ public class JdbcTableSink extends DataSink { private final String checkSum; private final TOdbcTableType jdbcType; private final boolean useTransaction; + private String insertSql; public JdbcTableSink(JdbcTable jdbcTable) { resourceName = jdbcTable.getResourceName(); @@ -57,6 +58,7 @@ public class JdbcTableSink extends DataSink { driverUrl = jdbcTable.getDriverUrl(); checkSum = jdbcTable.getCheckSum(); dorisTableName = jdbcTable.getName(); + insertSql = jdbcTable.getInsertSql(); } @Override @@ -84,6 +86,7 @@ public class JdbcTableSink extends DataSink { jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass); jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum); jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName); + jdbcTableSink.setInsertSql(insertSql); jdbcTableSink.setUseTransaction(useTransaction); jdbcTableSink.setTableType(jdbcType); diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java index 80859f809e..78ce7e1cbd 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java @@ -72,12 +72,61 @@ public class VectorColumn { } } else if (columnType.isStringType()) { childColumns = new VectorColumn[1]; - childColumns[0] = new VectorColumn(new ColumnType("#data", Type.BYTE), capacity * DEFAULT_STRING_LENGTH); + childColumns[0] = new VectorColumn(new ColumnType("#stringBytes", Type.BYTE), + capacity * DEFAULT_STRING_LENGTH); } reserveCapacity(capacity); } + // restore the child of string column & restore meta column + public VectorColumn(long address, int capacity, ColumnType columnType) { + this.columnType = columnType; + this.capacity = capacity; + this.nullMap = 0; + this.data = address; + this.offsets = 0; + this.numNulls = 0; + this.appendIndex = capacity; + } + + // restore block column + public VectorColumn(ColumnType columnType, int numRows, long columnMetaAddress) { + if (columnType.isUnsupported()) { + throw new RuntimeException("Unsupported type for column: " + columnType.getName()); + } + long address = columnMetaAddress; + this.capacity = numRows; + this.columnType = columnType; + this.nullMap = OffHeap.getLong(null, address); + address += 8; + this.numNulls = 0; + if (this.nullMap != 0) { + for (int i = 0; i < numRows; ++i) { + if (isNullAt(i)) { + this.numNulls++; + } + } + } + this.appendIndex = numRows; + + if (columnType.isComplexType()) { + // todo: support complex type + throw new RuntimeException("Unhandled type: " + columnType); + } else if (columnType.isStringType()) { + this.offsets = OffHeap.getLong(null, address); + address += 8; + this.data = 0; + int length = OffHeap.getInt(null, this.offsets + (numRows - 1) * 4L); + childColumns = new VectorColumn[1]; + childColumns[0] = new VectorColumn(OffHeap.getLong(null, address), length, + new ColumnType("#stringBytes", Type.BYTE)); + } else { + this.data = OffHeap.getLong(null, address); + this.offsets = 0; + } + } + public long nullMapAddress() { return nullMap; } @@ -90,6 +139,10 @@ public class VectorColumn { return offsets; } + public ColumnType.Type getColumnTyp() { + return columnType.getType(); + } + /** * Release columns and meta information */ @@ -159,8 +212,10 @@ public class VectorColumn { throw new RuntimeException("Unhandled type: " + columnType); } // todo: support complex type - this.nullMap = OffHeap.reallocateMemory(nullMap, oldCapacity, newCapacity); - OffHeap.setMemory(nullMap + oldCapacity, (byte) 0, newCapacity - oldCapacity); + if (!"#stringBytes".equals(columnType.getName())) { + this.nullMap = OffHeap.reallocateMemory(nullMap, oldCapacity, newCapacity); + OffHeap.setMemory(nullMap + oldCapacity, (byte) 0, newCapacity - oldCapacity); + } capacity = newCapacity; } @@ -178,7 +233,11 @@ public class VectorColumn { } public boolean isNullAt(int rowId) { - return OffHeap.getByte(null, nullMap + rowId) == 1; + if (nullMap == 0) { + return false; + } else { + return OffHeap.getByte(null, nullMap + rowId) == 1; + } } public boolean hasNull() { diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorTable.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorTable.java index 3c330bf796..3f86ebc7a2 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorTable.java +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorTable.java @@ -17,6 +17,7 @@ package org.apache.doris.jni.vec; +import org.apache.doris.jni.utils.OffHeap; import org.apache.doris.jni.vec.ColumnType.Type; /** @@ -24,12 +25,16 @@ import org.apache.doris.jni.vec.ColumnType.Type; */ public class VectorTable { private final VectorColumn[] columns; + private final ColumnType[] columnTypes; private final String[] fields; private final ScanPredicate[] predicates; private final VectorColumn meta; private int numRows; + private final boolean isRestoreTable; + public VectorTable(ColumnType[] types, String[] fields, ScanPredicate[] predicates, int capacity) { + this.columnTypes = types; this.fields = fields; this.columns = new VectorColumn[types.length]; this.predicates = predicates; @@ -40,13 +45,51 @@ public class VectorTable { } this.meta = new VectorColumn(new ColumnType("#meta", Type.BIGINT), metaSize); this.numRows = 0; + this.isRestoreTable = false; + } + + public VectorTable(ColumnType[] types, String[] fields, long metaAddress) { + long address = metaAddress; + this.columnTypes = types; + this.fields = fields; + this.columns = new VectorColumn[types.length]; + this.predicates = new ScanPredicate[0]; + + this.numRows = (int) OffHeap.getLong(null, address); + address += 8; + int metaSize = 1; // number of rows + for (int i = 0; i < types.length; i++) { + columns[i] = new VectorColumn(types[i], numRows, address); + metaSize += types[i].metaSize(); + address += types[i].metaSize() * 8L; + } + this.meta = new VectorColumn(metaAddress, metaSize, new ColumnType("#meta", Type.BIGINT)); + this.isRestoreTable = true; } public void appendData(int fieldId, ColumnValue o) { + assert (!isRestoreTable); columns[fieldId].appendValue(o); } + public VectorColumn[] getColumns() { + return columns; + } + + public VectorColumn getColumn(int fieldId) { + return columns[fieldId]; + } + + public ColumnType[] getColumnTypes() { + return columnTypes; + } + + public String[] getFields() { + return fields; + } + public void releaseColumn(int fieldId) { + assert (!isRestoreTable); columns[fieldId].close(); } @@ -59,15 +102,18 @@ public class VectorTable { } public long getMetaAddress() { - meta.reset(); - meta.appendLong(numRows); - for (VectorColumn c : columns) { - c.updateMeta(meta); + if (!isRestoreTable) { + meta.reset(); + meta.appendLong(numRows); + for (VectorColumn c : columns) { + c.updateMeta(meta); + } } return meta.dataAddress(); } public void reset() { + assert (!isRestoreTable); for (VectorColumn column : columns) { column.reset(); } @@ -75,6 +121,7 @@ public class VectorTable { } public void close() { + assert (!isRestoreTable); for (int i = 0; i < columns.length; i++) { releaseColumn(i); } diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index 53a7e64cde..483fef72ea 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -17,6 +17,9 @@ package org.apache.doris.udf; +import org.apache.doris.jni.vec.ColumnType; +import org.apache.doris.jni.vec.VectorColumn; +import org.apache.doris.jni.vec.VectorTable; import org.apache.doris.thrift.TJdbcExecutorCtorParams; import org.apache.doris.thrift.TJdbcOperation; import org.apache.doris.thrift.TOdbcTableType; @@ -48,6 +51,8 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; @@ -62,6 +67,7 @@ public class JdbcExecutor { private static final Logger LOG = Logger.getLogger(JdbcExecutor.class); private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); private Connection conn = null; + private PreparedStatement preparedStatement = null; private Statement stmt = null; private ResultSet resultSet = null; private ResultSetMetaData resultSetMetaData = null; @@ -125,6 +131,135 @@ public class JdbcExecutor { } } + public int write(Map<String, String> params) throws UdfRuntimeException { + String[] requiredFields = params.get("required_fields").split(","); + String[] types = params.get("columns_types").split("#"); + long metaAddress = Long.parseLong(params.get("meta_address")); + // Get sql string from configuration map + ColumnType[] columnTypes = new ColumnType[types.length]; + for (int i = 0; i < types.length; i++) { + columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]); + } + VectorTable batchTable = new VectorTable(columnTypes, requiredFields, metaAddress); + // Can't release or close batchTable, it's released by c++ + try { + insert(batchTable); + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor sql has error: ", e); + } + return batchTable.getNumRows(); + } + + private int insert(VectorTable data) throws SQLException { + for (int i = 0; i < data.getNumRows(); ++i) { + for (int j = 0; j < data.getColumns().length; ++j) { + insertColumn(i, j, data.getColumns()[j]); + } + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + preparedStatement.clearBatch(); + return data.getNumRows(); + } + + private void insertColumn(int rowIdx, int colIdx, VectorColumn column) throws SQLException { + int parameterIndex = colIdx + 1; + ColumnType.Type dorisType = column.getColumnTyp(); + if (column.isNullAt(rowIdx)) { + insertNullColumn(parameterIndex, dorisType); + return; + } + switch (dorisType) { + case BOOLEAN: + preparedStatement.setBoolean(parameterIndex, column.getBoolean(rowIdx)); + break; + case TINYINT: + preparedStatement.setByte(parameterIndex, (byte) column.getInt(rowIdx)); + break; + case SMALLINT: + preparedStatement.setShort(parameterIndex, (short) column.getInt(rowIdx)); + break; + case INT: + preparedStatement.setInt(parameterIndex, column.getInt(rowIdx)); + break; + case BIGINT: + preparedStatement.setLong(parameterIndex, column.getLong(rowIdx)); + break; + case FLOAT: + preparedStatement.setFloat(parameterIndex, column.getFloat(rowIdx)); + break; + case DOUBLE: + preparedStatement.setDouble(parameterIndex, column.getDouble(rowIdx)); + break; + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + preparedStatement.setBigDecimal(parameterIndex, column.getDecimal(rowIdx)); + break; + case DATEV2: + preparedStatement.setDate(parameterIndex, Date.valueOf(column.getDate(rowIdx))); + break; + case DATETIMEV2: + preparedStatement.setTimestamp(parameterIndex, Timestamp.valueOf(column.getDateTime(rowIdx))); + break; + case CHAR: + case VARCHAR: + case STRING: + case BINARY: + preparedStatement.setString(parameterIndex, column.getStringWithOffset(rowIdx)); + break; + default: + throw new RuntimeException("Unknown type value: " + dorisType); + } + } + + private void insertNullColumn(int parameterIndex, ColumnType.Type dorisType) throws SQLException { + switch (dorisType) { + case BOOLEAN: + preparedStatement.setNull(parameterIndex, Types.BOOLEAN); + break; + case TINYINT: + preparedStatement.setNull(parameterIndex, Types.TINYINT); + break; + case SMALLINT: + preparedStatement.setNull(parameterIndex, Types.SMALLINT); + break; + case INT: + preparedStatement.setNull(parameterIndex, Types.INTEGER); + break; + case BIGINT: + preparedStatement.setNull(parameterIndex, Types.BIGINT); + break; + case FLOAT: + preparedStatement.setNull(parameterIndex, Types.FLOAT); + break; + case DOUBLE: + preparedStatement.setNull(parameterIndex, Types.DOUBLE); + break; + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + preparedStatement.setNull(parameterIndex, Types.DECIMAL); + break; + case DATEV2: + preparedStatement.setNull(parameterIndex, Types.DATE); + break; + case DATETIMEV2: + preparedStatement.setNull(parameterIndex, Types.TIMESTAMP); + break; + case CHAR: + case VARCHAR: + case STRING: + case BINARY: + preparedStatement.setNull(parameterIndex, Types.VARCHAR); + break; + default: + throw new RuntimeException("Unknown type value: " + dorisType); + } + } + public List<String> getResultColumnTypeNames() { return resultColumnTypeNames; } @@ -272,7 +407,12 @@ public class JdbcExecutor { } batchSizeNum = batchSize; } else { - stmt = conn.createStatement(); + if (tableType == TOdbcTableType.ORACLE) { + LOG.info("insert sql: " + sql); + preparedStatement = conn.prepareStatement(sql); + } else { + stmt = conn.createStatement(); + } } } catch (MalformedURLException e) { throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e); diff --git a/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java b/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java index 5d6a0dfc68..32f4cccd79 100644 --- a/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java +++ b/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java @@ -18,6 +18,7 @@ package org.apache.doris.jni; import org.apache.doris.jni.utils.OffHeap; +import org.apache.doris.jni.vec.VectorTable; import org.junit.Assert; import org.junit.Test; @@ -38,20 +39,22 @@ public class JniScannerTest { + "date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)"); } }); - StringBuilder result = new StringBuilder(); scanner.open(); long metaAddress = 0; do { metaAddress = scanner.getNextBatchMeta(); if (metaAddress != 0) { - result.append(scanner.getTable().dump(32)); long rows = OffHeap.getLong(null, metaAddress); Assert.assertEquals(32, rows); + + VectorTable restoreTable = new VectorTable(scanner.getTable().getColumnTypes(), + scanner.getTable().getFields(), metaAddress); + System.out.println(restoreTable.dump((int) rows)); + // Restored table is release by the origin table. } scanner.resetTable(); } while (metaAddress != 0); scanner.releaseTable(); scanner.close(); - System.out.print(result); } } diff --git a/gensrc/script/gen_build_version.sh b/gensrc/script/gen_build_version.sh index de749bfb97..e77e5cb931 100755 --- a/gensrc/script/gen_build_version.sh +++ b/gensrc/script/gen_build_version.sh @@ -30,7 +30,7 @@ set -eo pipefail build_version_prefix="doris" build_version_major=2 build_version_minor=0 -build_version_patch=1 +build_version_patch=2 build_version_rc_version="cicc" build_version="${build_version_prefix}-${build_version_major}.${build_version_minor}.${build_version_patch}-${build_version_rc_version}" diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 6a5a6e1944..04f3ba3e60 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -156,6 +156,7 @@ struct TJdbcTableSink { 1: optional Descriptors.TJdbcTable jdbc_table 2: optional bool use_transaction 3: optional Types.TOdbcTableType table_type + 4: optional string insert_sql } struct TExportSink { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org