This is an automated email from the ASF dual-hosted git repository.
hubgeter pushed a commit to branch mc-test-branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/mc-test-branch-4.1 by this
push:
new 1b12aee7b49 [fix](maxcompute) Estimate write block size from Arrow
buffers, not per-row serialization
1b12aee7b49 is described below
commit 1b12aee7b49b11ffe6459936f71bca8f0ff49f1b
Author: daidai <[email protected]>
AuthorDate: Wed Jun 17 15:56:29 2026 +0800
[fix](maxcompute) Estimate write block size from Arrow buffers, not per-row
serialization
The old per-row estimateSingleRowPayloadBytes ZSTD-serialized a one-row
batch for
every row (CPU-heavy and ~25x oversized); sum FieldVector.getBufferSize()
over the
whole batch instead, and rotate the block lazily.
---
.../doris/maxcompute/MaxComputeJniWriter.java | 163 ++++-----------------
1 file changed, 27 insertions(+), 136 deletions(-)
diff --git
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
index 334f7124f59..85a5de24182 100644
---
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
+++
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
@@ -25,8 +25,6 @@ import org.apache.doris.common.maxcompute.MCUtils;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsType;
-import com.aliyun.odps.table.arrow.ArrowWriter;
-import com.aliyun.odps.table.arrow.ArrowWriterFactory;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
import com.aliyun.odps.table.configuration.CompressionCodec;
@@ -67,7 +65,6 @@ import org.apache.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
@@ -274,7 +271,7 @@ public class MaxComputeJniWriter extends JniWriter {
long startNs = System.nanoTime();
logDiag("JAVA_WRITE_INTERNAL_ENTER", "rows=" + numRows + ", columns="
+ numCols);
try {
- writeRowsWithRowChecks(inputTable, numRows, numCols);
+ writeBatch(inputTable, numRows, numCols);
logDiag("JAVA_WRITE_INTERNAL_EXIT",
"rows=" + numRows + ", columns=" + numCols + ", costMs=" +
elapsedMs(startNs));
} catch (Exception e) {
@@ -345,145 +342,39 @@ public class MaxComputeJniWriter extends JniWriter {
logDiag("JAVA_ROTATE_AFTER", "costMs=" + elapsedMs(startNs));
}
- private void writeRowsWithRowChecks(VectorTable inputTable, int numRows,
int numCols) throws IOException {
- logDiag("JAVA_WRITE_ROWS_ENTER", "rows=" + numRows + ", columns=" +
numCols);
- int rowStart = 0;
- while (rowStart < numRows) {
- int rowEnd = rowStart;
- long batchEstimatedBytes = 0L;
- boolean rotateAfterWrite = false;
- int estimatedRows = 0;
- long rangeStartNs = System.nanoTime();
- logDiag("JAVA_RANGE_SELECT_BEFORE",
- "rowStart=" + rowStart
- + ", rows=" + numRows
- + ", currentBlockWrittenBytes=" +
currentBlockWrittenBytes);
- while (rowEnd < numRows) {
- if (estimatedRows == 0 || estimatedRows % 1024 == 0) {
- logDiag("JAVA_RANGE_SELECT_PROGRESS",
- "rowStart=" + rowStart
- + ", probingRow=" + rowEnd
- + ", estimatedRows=" + estimatedRows
- + ", batchEstimatedBytes=" +
batchEstimatedBytes);
- }
- long rowEstimatedBytes =
estimateSingleRowPayloadBytes(inputTable, numCols, rowEnd);
- estimatedRows++;
- boolean exceedsHardLimit = currentBlockWrittenBytes +
batchEstimatedBytes
- + rowEstimatedBytes > maxBlockBytes;
- if (exceedsHardLimit) {
- if (rowEnd == rowStart) {
- if (currentBlockWrittenBytes > 0) {
-
logDiag("JAVA_RANGE_SELECT_ROTATE_FOR_OVERSIZE_ROW",
- "rowStart=" + rowStart
- + ", rowEstimatedBytes=" +
rowEstimatedBytes
- + ", currentBlockWrittenBytes=" +
currentBlockWrittenBytes);
- rotateCurrentBatchWriter();
- continue;
- }
- batchEstimatedBytes += rowEstimatedBytes;
- rowEnd++;
- rotateAfterWrite = true;
- }
- break;
- }
- batchEstimatedBytes += rowEstimatedBytes;
- rowEnd++;
- if (currentBlockWrittenBytes + batchEstimatedBytes >=
maxBlockBytes) {
- rotateAfterWrite = true;
- break;
- }
- }
- logDiag("JAVA_RANGE_SELECT_AFTER",
- "rowStart=" + rowStart
- + ", rowEnd=" + rowEnd
- + ", selectedRows=" + (rowEnd - rowStart)
- + ", estimatedRows=" + estimatedRows
- + ", batchEstimatedBytes=" + batchEstimatedBytes
- + ", rotateAfterWrite=" + rotateAfterWrite
- + ", costMs=" + elapsedMs(rangeStartNs));
-
- if (rowEnd == rowStart) {
- long fallbackStartNs = System.nanoTime();
- logDiag("JAVA_RANGE_SELECT_FALLBACK_BEFORE", "rowStart=" +
rowStart);
- long rowEstimatedBytes =
estimateSingleRowPayloadBytes(inputTable, numCols, rowStart);
- batchEstimatedBytes = rowEstimatedBytes;
- rowEnd = rowStart + 1;
- rotateAfterWrite = true;
- logDiag("JAVA_RANGE_SELECT_FALLBACK_AFTER",
- "rowStart=" + rowStart
- + ", rowEstimatedBytes=" + rowEstimatedBytes
- + ", costMs=" + elapsedMs(fallbackStartNs));
- }
-
- long buildStartNs = System.nanoTime();
- logDiag("JAVA_BUILD_ROOT_BEFORE",
- "rowStart=" + rowStart + ", rowEnd=" + rowEnd
- + ", selectedRows=" + (rowEnd - rowStart));
- try (VectorSchemaRoot root = buildRowRangeRoot(inputTable,
numCols, rowStart, rowEnd, true)) {
- logDiag("JAVA_BUILD_ROOT_AFTER",
- "rowStart=" + rowStart + ", rowEnd=" + rowEnd
- + ", selectedRows=" + (rowEnd - rowStart)
- + ", costMs=" + elapsedMs(buildStartNs));
- long writeStartNs = System.nanoTime();
- logDiag("JAVA_BATCH_WRITE_BEFORE",
- "rowStart=" + rowStart + ", rowEnd=" + rowEnd
- + ", selectedRows=" + (rowEnd - rowStart)
- + ", batchEstimatedBytes=" +
batchEstimatedBytes);
- batchWriter.write(root);
- logDiag("JAVA_BATCH_WRITE_AFTER",
- "rowStart=" + rowStart + ", rowEnd=" + rowEnd
- + ", selectedRows=" + (rowEnd - rowStart)
- + ", costMs=" + elapsedMs(writeStartNs));
- }
- long flushStartNs = System.nanoTime();
- logDiag("JAVA_FLUSH_BEFORE",
- "rowStart=" + rowStart + ", rowEnd=" + rowEnd
- + ", selectedRows=" + (rowEnd - rowStart));
- batchWriter.flush();
- logDiag("JAVA_FLUSH_AFTER",
- "rowStart=" + rowStart + ", rowEnd=" + rowEnd
- + ", selectedRows=" + (rowEnd - rowStart)
- + ", costMs=" + elapsedMs(flushStartNs));
- int rowsWrittenNow = rowEnd - rowStart;
- writtenRows += rowsWrittenNow;
- currentBlockWrittenBytes += batchEstimatedBytes;
- writtenBytes += batchEstimatedBytes;
- logDiag("JAVA_BATCH_DONE",
- "rowsWrittenNow=" + rowsWrittenNow
- + ", nextRowStart=" + rowEnd
- + ", currentBlockWrittenBytes=" +
currentBlockWrittenBytes
- + ", writtenRows=" + writtenRows
- + ", writtenBytes=" + writtenBytes
- + ", rotateAfterWrite=" + rotateAfterWrite);
- rowStart = rowEnd;
-
- if (rotateAfterWrite && rowStart < numRows) {
- rotateCurrentBatchWriter();
- }
+ private void writeBatch(VectorTable inputTable, int numRows, int numCols)
throws IOException {
+ // Roll to a fresh block before writing once the current one hits the
size target.
+ if (batchWriter != null && currentBlockWrittenBytes >= maxBlockBytes) {
+ rotateCurrentBatchWriter();
}
- logDiag("JAVA_WRITE_ROWS_EXIT", "rows=" + numRows + ", columns=" +
numCols);
- }
- private static class CountingDiscardOutputStream extends OutputStream {
- @Override
- public void write(int b) {
- // Discard bytes while allowing WriteChannel to track payload size.
+ long startNs = System.nanoTime();
+ long batchBytes;
+ try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols, 0,
numRows, false)) {
+ batchBytes = estimateBatchPayloadBytes(root);
+ batchWriter.write(root);
}
+ batchWriter.flush();
- @Override
- public void write(byte[] b, int off, int len) {
- // Discard bytes while allowing WriteChannel to track payload size.
- }
+ writtenRows += numRows;
+ currentBlockWrittenBytes += batchBytes;
+ writtenBytes += batchBytes;
+ logDiag("JAVA_BATCH_DONE",
+ "rows=" + numRows
+ + ", batchBytes=" + batchBytes
+ + ", currentBlockWrittenBytes=" +
currentBlockWrittenBytes
+ + ", writtenRows=" + writtenRows
+ + ", writtenBytes=" + writtenBytes
+ + ", costMs=" + elapsedMs(startNs));
}
- private long estimateSingleRowPayloadBytes(VectorTable inputTable, int
numCols, int rowIndex)
- throws IOException {
- try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols,
rowIndex, rowIndex + 1, false);
- ArrowWriter estimator =
ArrowWriterFactory.getRecordBatchWriter(
- new CountingDiscardOutputStream(), writerOptions)) {
- estimator.writeBatch(root);
- return estimator.bytesWritten();
+ // Estimate an Arrow batch's payload size from its column buffer sizes
(O(columns)).
+ static long estimateBatchPayloadBytes(VectorSchemaRoot root) {
+ long total = 0L;
+ for (FieldVector vector : root.getFieldVectors()) {
+ total += vector.getBufferSize();
}
+ return total;
}
private VectorSchemaRoot buildRowRangeRoot(VectorTable inputTable, int
numCols, int rowStart, int rowEnd,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]