This is an automated email from the ASF dual-hosted git repository.

hubgeter pushed a commit to branch mc-test-branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/mc-test-branch-4.1 by this 
push:
     new 1b12aee7b49 [fix](maxcompute) Estimate write block size from Arrow 
buffers, not per-row serialization
1b12aee7b49 is described below

commit 1b12aee7b49b11ffe6459936f71bca8f0ff49f1b
Author: daidai <[email protected]>
AuthorDate: Wed Jun 17 15:56:29 2026 +0800

    [fix](maxcompute) Estimate write block size from Arrow buffers, not per-row 
serialization
    
    The old per-row estimateSingleRowPayloadBytes ZSTD-serialized a one-row 
batch for
    every row (CPU-heavy and ~25x oversized); sum FieldVector.getBufferSize() 
over the
    whole batch instead, and rotate the block lazily.
---
 .../doris/maxcompute/MaxComputeJniWriter.java      | 163 ++++-----------------
 1 file changed, 27 insertions(+), 136 deletions(-)

diff --git 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
index 334f7124f59..85a5de24182 100644
--- 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
+++ 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
@@ -25,8 +25,6 @@ import org.apache.doris.common.maxcompute.MCUtils;
 
 import com.aliyun.odps.Odps;
 import com.aliyun.odps.OdpsType;
-import com.aliyun.odps.table.arrow.ArrowWriter;
-import com.aliyun.odps.table.arrow.ArrowWriterFactory;
 import com.aliyun.odps.table.configuration.ArrowOptions;
 import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
 import com.aliyun.odps.table.configuration.CompressionCodec;
@@ -67,7 +65,6 @@ import org.apache.log4j.Logger;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
-import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
@@ -274,7 +271,7 @@ public class MaxComputeJniWriter extends JniWriter {
         long startNs = System.nanoTime();
         logDiag("JAVA_WRITE_INTERNAL_ENTER", "rows=" + numRows + ", columns=" 
+ numCols);
         try {
-            writeRowsWithRowChecks(inputTable, numRows, numCols);
+            writeBatch(inputTable, numRows, numCols);
             logDiag("JAVA_WRITE_INTERNAL_EXIT",
                     "rows=" + numRows + ", columns=" + numCols + ", costMs=" + 
elapsedMs(startNs));
         } catch (Exception e) {
@@ -345,145 +342,39 @@ public class MaxComputeJniWriter extends JniWriter {
         logDiag("JAVA_ROTATE_AFTER", "costMs=" + elapsedMs(startNs));
     }
 
-    private void writeRowsWithRowChecks(VectorTable inputTable, int numRows, 
int numCols) throws IOException {
-        logDiag("JAVA_WRITE_ROWS_ENTER", "rows=" + numRows + ", columns=" + 
numCols);
-        int rowStart = 0;
-        while (rowStart < numRows) {
-            int rowEnd = rowStart;
-            long batchEstimatedBytes = 0L;
-            boolean rotateAfterWrite = false;
-            int estimatedRows = 0;
-            long rangeStartNs = System.nanoTime();
-            logDiag("JAVA_RANGE_SELECT_BEFORE",
-                    "rowStart=" + rowStart
-                            + ", rows=" + numRows
-                            + ", currentBlockWrittenBytes=" + 
currentBlockWrittenBytes);
-            while (rowEnd < numRows) {
-                if (estimatedRows == 0 || estimatedRows % 1024 == 0) {
-                    logDiag("JAVA_RANGE_SELECT_PROGRESS",
-                            "rowStart=" + rowStart
-                                    + ", probingRow=" + rowEnd
-                                    + ", estimatedRows=" + estimatedRows
-                                    + ", batchEstimatedBytes=" + 
batchEstimatedBytes);
-                }
-                long rowEstimatedBytes = 
estimateSingleRowPayloadBytes(inputTable, numCols, rowEnd);
-                estimatedRows++;
-                boolean exceedsHardLimit = currentBlockWrittenBytes + 
batchEstimatedBytes
-                        + rowEstimatedBytes > maxBlockBytes;
-                if (exceedsHardLimit) {
-                    if (rowEnd == rowStart) {
-                        if (currentBlockWrittenBytes > 0) {
-                            
logDiag("JAVA_RANGE_SELECT_ROTATE_FOR_OVERSIZE_ROW",
-                                    "rowStart=" + rowStart
-                                            + ", rowEstimatedBytes=" + 
rowEstimatedBytes
-                                            + ", currentBlockWrittenBytes=" + 
currentBlockWrittenBytes);
-                            rotateCurrentBatchWriter();
-                            continue;
-                        }
-                        batchEstimatedBytes += rowEstimatedBytes;
-                        rowEnd++;
-                        rotateAfterWrite = true;
-                    }
-                    break;
-                }
-                batchEstimatedBytes += rowEstimatedBytes;
-                rowEnd++;
-                if (currentBlockWrittenBytes + batchEstimatedBytes >= 
maxBlockBytes) {
-                    rotateAfterWrite = true;
-                    break;
-                }
-            }
-            logDiag("JAVA_RANGE_SELECT_AFTER",
-                    "rowStart=" + rowStart
-                            + ", rowEnd=" + rowEnd
-                            + ", selectedRows=" + (rowEnd - rowStart)
-                            + ", estimatedRows=" + estimatedRows
-                            + ", batchEstimatedBytes=" + batchEstimatedBytes
-                            + ", rotateAfterWrite=" + rotateAfterWrite
-                            + ", costMs=" + elapsedMs(rangeStartNs));
-
-            if (rowEnd == rowStart) {
-                long fallbackStartNs = System.nanoTime();
-                logDiag("JAVA_RANGE_SELECT_FALLBACK_BEFORE", "rowStart=" + 
rowStart);
-                long rowEstimatedBytes = 
estimateSingleRowPayloadBytes(inputTable, numCols, rowStart);
-                batchEstimatedBytes = rowEstimatedBytes;
-                rowEnd = rowStart + 1;
-                rotateAfterWrite = true;
-                logDiag("JAVA_RANGE_SELECT_FALLBACK_AFTER",
-                        "rowStart=" + rowStart
-                                + ", rowEstimatedBytes=" + rowEstimatedBytes
-                                + ", costMs=" + elapsedMs(fallbackStartNs));
-            }
-
-            long buildStartNs = System.nanoTime();
-            logDiag("JAVA_BUILD_ROOT_BEFORE",
-                    "rowStart=" + rowStart + ", rowEnd=" + rowEnd
-                            + ", selectedRows=" + (rowEnd - rowStart));
-            try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, 
numCols, rowStart, rowEnd, true)) {
-                logDiag("JAVA_BUILD_ROOT_AFTER",
-                        "rowStart=" + rowStart + ", rowEnd=" + rowEnd
-                                + ", selectedRows=" + (rowEnd - rowStart)
-                                + ", costMs=" + elapsedMs(buildStartNs));
-                long writeStartNs = System.nanoTime();
-                logDiag("JAVA_BATCH_WRITE_BEFORE",
-                        "rowStart=" + rowStart + ", rowEnd=" + rowEnd
-                                + ", selectedRows=" + (rowEnd - rowStart)
-                                + ", batchEstimatedBytes=" + 
batchEstimatedBytes);
-                batchWriter.write(root);
-                logDiag("JAVA_BATCH_WRITE_AFTER",
-                        "rowStart=" + rowStart + ", rowEnd=" + rowEnd
-                                + ", selectedRows=" + (rowEnd - rowStart)
-                                + ", costMs=" + elapsedMs(writeStartNs));
-            }
-            long flushStartNs = System.nanoTime();
-            logDiag("JAVA_FLUSH_BEFORE",
-                    "rowStart=" + rowStart + ", rowEnd=" + rowEnd
-                            + ", selectedRows=" + (rowEnd - rowStart));
-            batchWriter.flush();
-            logDiag("JAVA_FLUSH_AFTER",
-                    "rowStart=" + rowStart + ", rowEnd=" + rowEnd
-                            + ", selectedRows=" + (rowEnd - rowStart)
-                            + ", costMs=" + elapsedMs(flushStartNs));
-            int rowsWrittenNow = rowEnd - rowStart;
-            writtenRows += rowsWrittenNow;
-            currentBlockWrittenBytes += batchEstimatedBytes;
-            writtenBytes += batchEstimatedBytes;
-            logDiag("JAVA_BATCH_DONE",
-                    "rowsWrittenNow=" + rowsWrittenNow
-                            + ", nextRowStart=" + rowEnd
-                            + ", currentBlockWrittenBytes=" + 
currentBlockWrittenBytes
-                            + ", writtenRows=" + writtenRows
-                            + ", writtenBytes=" + writtenBytes
-                            + ", rotateAfterWrite=" + rotateAfterWrite);
-            rowStart = rowEnd;
-
-            if (rotateAfterWrite && rowStart < numRows) {
-                rotateCurrentBatchWriter();
-            }
+    private void writeBatch(VectorTable inputTable, int numRows, int numCols) 
throws IOException {
+        // Roll to a fresh block before writing once the current one hits the 
size target.
+        if (batchWriter != null && currentBlockWrittenBytes >= maxBlockBytes) {
+            rotateCurrentBatchWriter();
         }
-        logDiag("JAVA_WRITE_ROWS_EXIT", "rows=" + numRows + ", columns=" + 
numCols);
-    }
 
-    private static class CountingDiscardOutputStream extends OutputStream {
-        @Override
-        public void write(int b) {
-            // Discard bytes while allowing WriteChannel to track payload size.
+        long startNs = System.nanoTime();
+        long batchBytes;
+        try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, 0, 
numRows, false)) {
+            batchBytes = estimateBatchPayloadBytes(root);
+            batchWriter.write(root);
         }
+        batchWriter.flush();
 
-        @Override
-        public void write(byte[] b, int off, int len) {
-            // Discard bytes while allowing WriteChannel to track payload size.
-        }
+        writtenRows += numRows;
+        currentBlockWrittenBytes += batchBytes;
+        writtenBytes += batchBytes;
+        logDiag("JAVA_BATCH_DONE",
+                "rows=" + numRows
+                        + ", batchBytes=" + batchBytes
+                        + ", currentBlockWrittenBytes=" + 
currentBlockWrittenBytes
+                        + ", writtenRows=" + writtenRows
+                        + ", writtenBytes=" + writtenBytes
+                        + ", costMs=" + elapsedMs(startNs));
     }
 
-    private long estimateSingleRowPayloadBytes(VectorTable inputTable, int 
numCols, int rowIndex)
-            throws IOException {
-        try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, 
rowIndex, rowIndex + 1, false);
-                ArrowWriter estimator = 
ArrowWriterFactory.getRecordBatchWriter(
-                        new CountingDiscardOutputStream(), writerOptions)) {
-            estimator.writeBatch(root);
-            return estimator.bytesWritten();
+    // Estimate an Arrow batch's payload size from its column buffer sizes 
(O(columns)).
+    static long estimateBatchPayloadBytes(VectorSchemaRoot root) {
+        long total = 0L;
+        for (FieldVector vector : root.getFieldVectors()) {
+            total += vector.getBufferSize();
         }
+        return total;
     }
 
     private VectorSchemaRoot buildRowRangeRoot(VectorTable inputTable, int 
numCols, int rowStart, int rowEnd,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to