morningman commented on code in PR #61141:
URL: https://github.com/apache/doris/pull/61141#discussion_r2933230218


##########
be/src/exec/sink/writer/vjdbc_table_writer.cpp:
##########
@@ -15,67 +15,100 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/sink/writer/vjdbc_table_writer.h"
+#include "vjdbc_table_writer.h"
 
 #include <gen_cpp/DataSinks_types.h>
 #include <stdint.h>
 
 #include <sstream>
 
-#include "core/binary_cast.hpp"
+#include "common/logging.h"
 #include "core/block/block.h"
 #include "exprs/vexpr.h"
 #include "exprs/vexpr_context.h"
+#include "runtime/runtime_state.h"
+#include "util/jdbc_utils.h"
 
 namespace doris {
 
-JdbcConnectorParam VJdbcTableWriter::create_connect_param(const 
doris::TDataSink& t_sink) {
+std::map<std::string, std::string> 
VJdbcTableWriter::_build_writer_params(const TDataSink& t_sink) {
     const TJdbcTableSink& t_jdbc_sink = t_sink.jdbc_table_sink;
+    std::map<std::string, std::string> params;
 
-    JdbcConnectorParam jdbc_param;
-
-    jdbc_param.catalog_id = t_jdbc_sink.jdbc_table.catalog_id;
-    jdbc_param.jdbc_url = t_jdbc_sink.jdbc_table.jdbc_url;
-    jdbc_param.user = t_jdbc_sink.jdbc_table.jdbc_user;
-    jdbc_param.passwd = t_jdbc_sink.jdbc_table.jdbc_password;
-    jdbc_param.driver_class = t_jdbc_sink.jdbc_table.jdbc_driver_class;
-    jdbc_param.driver_path = t_jdbc_sink.jdbc_table.jdbc_driver_url;
-    jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
-    jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
-    jdbc_param.table_type = t_jdbc_sink.table_type;
-    jdbc_param.query_string = t_jdbc_sink.insert_sql;
-    jdbc_param.table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
-    jdbc_param.use_transaction = t_jdbc_sink.use_transaction;
-    jdbc_param.connection_pool_min_size = 
t_jdbc_sink.jdbc_table.connection_pool_min_size;
-    jdbc_param.connection_pool_max_size = 
t_jdbc_sink.jdbc_table.connection_pool_max_size;
-    jdbc_param.connection_pool_max_wait_time = 
t_jdbc_sink.jdbc_table.connection_pool_max_wait_time;
-    jdbc_param.connection_pool_max_life_time = 
t_jdbc_sink.jdbc_table.connection_pool_max_life_time;
-    jdbc_param.connection_pool_keep_alive = 
t_jdbc_sink.jdbc_table.connection_pool_keep_alive;
-
-    return jdbc_param;
+    params["jdbc_url"] = t_jdbc_sink.jdbc_table.jdbc_url;
+    params["jdbc_user"] = t_jdbc_sink.jdbc_table.jdbc_user;
+    params["jdbc_password"] = t_jdbc_sink.jdbc_table.jdbc_password;
+    params["jdbc_driver_class"] = t_jdbc_sink.jdbc_table.jdbc_driver_class;
+    // Resolve jdbc_driver_url to absolute file:// URL
+    std::string driver_url;
+    auto resolve_st =
+            
JdbcUtils::resolve_driver_url(t_jdbc_sink.jdbc_table.jdbc_driver_url, 
&driver_url);
+    if (!resolve_st.ok()) {
+        LOG(WARNING) << "Failed to resolve JDBC driver URL: " << 
resolve_st.to_string();
+        driver_url = t_jdbc_sink.jdbc_table.jdbc_driver_url;
+    }
+    params["jdbc_driver_url"] = driver_url;
+
+    params["jdbc_driver_checksum"] = 
t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
+    params["insert_sql"] = t_jdbc_sink.insert_sql;
+    params["use_transaction"] = t_jdbc_sink.use_transaction ? "true" : "false";
+    params["catalog_id"] = std::to_string(t_jdbc_sink.jdbc_table.catalog_id);
+    params["connection_pool_min_size"] =
+            std::to_string(t_jdbc_sink.jdbc_table.connection_pool_min_size);
+    params["connection_pool_max_size"] =
+            std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_size);
+    params["connection_pool_max_wait_time"] =
+            
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_wait_time);
+    params["connection_pool_max_life_time"] =
+            
std::to_string(t_jdbc_sink.jdbc_table.connection_pool_max_life_time);
+    params["connection_pool_keep_alive"] =
+            t_jdbc_sink.jdbc_table.connection_pool_keep_alive ? "true" : 
"false";
+
+    return params;
 }
 
 VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
                                    const VExprContextSPtrs& output_expr_ctxs,
                                    std::shared_ptr<Dependency> dep,
                                    std::shared_ptr<Dependency> fin_dep)
         : AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
-          JdbcConnector(create_connect_param(t_sink)) {}
+          _writer_params(_build_writer_params(t_sink)),
+          _use_transaction(t_sink.jdbc_table_sink.use_transaction) {}
+
+Status VJdbcTableWriter::open(RuntimeState* state, RuntimeProfile* 
operator_profile) {
+    _writer = std::make_unique<VJniFormatTransformer>(
+            state, _vec_output_expr_ctxs, 
"org/apache/doris/jdbc/JdbcJniWriter", _writer_params);
+    return _writer->open();
+}
 
 Status VJdbcTableWriter::write(RuntimeState* state, Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
-    auto num_rows = output_block.rows();
-
-    uint32_t start_send_row = 0;
-    uint32_t num_row_sent = 0;
-    while (start_send_row < num_rows) {
-        RETURN_IF_ERROR(append(&output_block, _vec_output_expr_ctxs, 
start_send_row, &num_row_sent,
-                               _conn_param.table_type));
-        start_send_row += num_row_sent;
-        num_row_sent = 0;
+
+    if (output_block.rows() == 0) {
+        return Status::OK();
+    }
+
+    return _writer->write(output_block);
+}
+
+Status VJdbcTableWriter::finish(RuntimeState* state) {
+    if (!_use_transaction || !_writer) {
+        return Status::OK();
     }
 
+    // Call commitTrans on the Java JdbcJniWriter via JNI
+    // VJniFormatTransformer manages the JNI writer object, so we use 
get_statistics
+    // to check for errors. The actual commit is done as part of close() on 
the Java side
+    // when use_transaction is true.
+    // TODO: Add explicit commitTrans JNI call support to VJniFormatTransformer
+    return Status::OK();
+}

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to