AshinGau commented on code in PR #19950:
URL: https://github.com/apache/doris/pull/19950#discussion_r1205084188


##########
fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java:
##########
@@ -154,22 +158,137 @@ public int write(String sql) throws UdfRuntimeException {
         }
     }
 
-    public int write(Map<String, String> params) {
+    public int write(Map<String, String> params) throws UdfRuntimeException {
         String[] requiredFields = params.get("required_fields").split(",");
         String[] types = params.get("columns_types").split("#");
         long metaAddress = Long.parseLong(params.get("meta_address"));
         // Get sql string from configuration map
-        // String sql = params.get("write_sql");
         ColumnType[] columnTypes = new ColumnType[types.length];
         for (int i = 0; i < types.length; i++) {
             columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
         }
         VectorTable batchTable = new VectorTable(columnTypes, requiredFields, 
metaAddress);
         // todo: insert the batch table by PreparedStatement
         // Can't release or close batchTable, it's released by c++
+        try {
+            insert(batchTable);
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+        }
         return batchTable.getNumRows();
     }
 
+    private int insert(VectorTable data) throws SQLException {
+        for (int i = 0; i < data.getNumRows(); ++i) {
+            for (int j = 0; j < data.getColumns().length; ++j) {
+                insertColumn(i, j, data.getColumns()[j]);
+            }
+            preparedStatement.addBatch();
+        }
+        preparedStatement.executeBatch();
+        preparedStatement.clearBatch();
+        return data.getNumRows();
+    }
+
+    private void insertColumn(int rowIdx, int colIdx, VectorColumn column) 
throws SQLException {
+        int parameterIndex = colIdx + 1;
+        ColumnType.Type dorisType = column.getColumnTyp();
+        if (column.isNullAt(rowIdx)) {
+            insertNullColumn(parameterIndex, dorisType);
+            return;
+        }
+        switch (dorisType) {
+            case BOOLEAN:
+                preparedStatement.setBoolean(parameterIndex, 
column.getBoolean(rowIdx));
+                break;
+            case TINYINT:
+                preparedStatement.setByte(parameterIndex, (byte) 
column.getInt(rowIdx));
+                break;
+            case SMALLINT:
+                preparedStatement.setShort(parameterIndex, (short) 
column.getInt(rowIdx));
+                break;
+            case INT:
+                preparedStatement.setInt(parameterIndex, 
column.getInt(rowIdx));
+                break;
+            case BIGINT:
+                preparedStatement.setLong(parameterIndex, 
column.getLong(rowIdx));
+                break;
+            case FLOAT:
+                preparedStatement.setFloat(parameterIndex, 
column.getFloat(rowIdx));
+                break;
+            case DOUBLE:
+                preparedStatement.setDouble(parameterIndex, 
column.getDouble(rowIdx));
+                break;
+            case LARGEINT:
+            case DECIMALV2:
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+                preparedStatement.setBigDecimal(parameterIndex, 
column.getDecimal(rowIdx));

Review Comment:
   `LARGEINT` is the `java.math.BigInteger` in java, so we should add 
`getBigInt()` in `org.apache.doris.jni.vec.ColumnValue`.



##########
fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java:
##########
@@ -154,22 +158,137 @@ public int write(String sql) throws UdfRuntimeException {
         }
     }
 
-    public int write(Map<String, String> params) {
+    public int write(Map<String, String> params) throws UdfRuntimeException {
         String[] requiredFields = params.get("required_fields").split(",");
         String[] types = params.get("columns_types").split("#");
         long metaAddress = Long.parseLong(params.get("meta_address"));
         // Get sql string from configuration map
-        // String sql = params.get("write_sql");
         ColumnType[] columnTypes = new ColumnType[types.length];
         for (int i = 0; i < types.length; i++) {
             columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
         }
         VectorTable batchTable = new VectorTable(columnTypes, requiredFields, 
metaAddress);
         // todo: insert the batch table by PreparedStatement
         // Can't release or close batchTable, it's released by c++
+        try {
+            insert(batchTable);
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+        }
         return batchTable.getNumRows();
     }
 
+    private int insert(VectorTable data) throws SQLException {
+        for (int i = 0; i < data.getNumRows(); ++i) {
+            for (int j = 0; j < data.getColumns().length; ++j) {
+                insertColumn(i, j, data.getColumns()[j]);
+            }
+            preparedStatement.addBatch();
+        }
+        preparedStatement.executeBatch();
+        preparedStatement.clearBatch();
+        return data.getNumRows();
+    }
+
+    private void insertColumn(int rowIdx, int colIdx, VectorColumn column) 
throws SQLException {
+        int parameterIndex = colIdx + 1;
+        ColumnType.Type dorisType = column.getColumnTyp();
+        if (column.isNullAt(rowIdx)) {
+            insertNullColumn(parameterIndex, dorisType);
+            return;
+        }
+        switch (dorisType) {
+            case BOOLEAN:
+                preparedStatement.setBoolean(parameterIndex, 
column.getBoolean(rowIdx));
+                break;
+            case TINYINT:
+                preparedStatement.setByte(parameterIndex, (byte) 
column.getInt(rowIdx));
+                break;
+            case SMALLINT:
+                preparedStatement.setShort(parameterIndex, (short) 
column.getInt(rowIdx));
+                break;
+            case INT:
+                preparedStatement.setInt(parameterIndex, 
column.getInt(rowIdx));
+                break;
+            case BIGINT:
+                preparedStatement.setLong(parameterIndex, 
column.getLong(rowIdx));
+                break;
+            case FLOAT:
+                preparedStatement.setFloat(parameterIndex, 
column.getFloat(rowIdx));
+                break;
+            case DOUBLE:
+                preparedStatement.setDouble(parameterIndex, 
column.getDouble(rowIdx));
+                break;
+            case LARGEINT:

Review Comment:
   `LARGEINT` should add insertion method in 
`org.apache.doris.jni.vec.VectorColumn#appendValue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to