This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 647e13eae07 [enhancement](image): load image supports reading in 
batches (#50183)
647e13eae07 is described below

commit 647e13eae07bde9d42cb1d4f744351640485b1c8
Author: htyoung <hty551...@hotmail.com>
AuthorDate: Sat Apr 26 14:55:36 2025 +0800

    [enhancement](image): load image supports reading in batches (#50183)
    
    Text.readString supports reading in batches to avoid the situation where
    the size of delete info in image file becomes too large due to a long
    period without checkpointing, which could cause the method
    CharBuffer.allocate size to exceed the maximum value of
    Integer.MAX_VALUE and result in an overflow.
---
 .../main/java/org/apache/doris/common/Config.java  |  13 +++
 .../main/java/org/apache/doris/common/io/Text.java | 109 ++++++++++++---------
 .../java/org/apache/doris/common/io/TextTest.java  |  65 ++++++++++++
 3 files changed, 143 insertions(+), 44 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 082bbf95921..15d31a50935 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2883,6 +2883,19 @@ public class Config extends ConfigBase {
     })
     public static int sync_image_timeout_second = 300;
 
+    @ConfField(mutable = true, description = {
+        "FE启动时加载image文件某个模块的二进制内容到字节数组,并将字节数组反序列化为utf8编码字符串时单批次(单位:byte, 
至少500MB)"
+            + "的大小。等于-1的值表示一次性读取完整的字节数组后反序列化反序列化为utf8编码字符串;"
+            + 
"不等于-1的值(至少16MB)表示分批每次读取多大的字节数组后反序列化为utf8编码字符串,最后合并成完成的字符串。默认值为-1",
+        "The size of a single batch (in bytes) when loading the binary content 
of a module of the "
+            + "image file into a byte array and deserializing the byte array 
into a utf8 encoded string when FE starts."
+            + " A value equal to -1 means reading the entire byte array at 
once and "
+            + "then deserializing it into a utf8 encoded string; a value not 
equal to -1 means reading "
+            + "a certain size (at least 16MB) of byte array in batches and 
then deserializing it into a "
+            + "utf8 encoded string, and finally merging it into a completed 
string. The default value is -1"
+    })
+    public static int metadata_text_read_max_batch_bytes = -1;
+
     @ConfField(mutable = true, masterOnly = true)
     public static int publish_topic_info_interval_ms = 30000; // 30s
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java
index 76256f30feb..50efafc55ab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.common.io;
 
+import org.apache.doris.common.Config;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,11 +28,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CharsetEncoder;
 import java.nio.charset.CodingErrorAction;
 import java.nio.charset.MalformedInputException;
+import java.nio.charset.StandardCharsets;
 import java.text.CharacterIterator;
 import java.text.StringCharacterIterator;
 
@@ -48,17 +50,17 @@ import java.text.StringCharacterIterator;
 public class Text implements Writable {
     private static final Logger LOG = LoggerFactory.getLogger(Text.class);
 
-    private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new 
ThreadLocal<CharsetEncoder>() {
+    private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new 
ThreadLocal<CharsetEncoder>() {
         protected CharsetEncoder initialValue() {
-            return Charset.forName("UTF-8").newEncoder()
+            return StandardCharsets.UTF_8.newEncoder()
                     .onMalformedInput(CodingErrorAction.REPORT)
                     .onUnmappableCharacter(CodingErrorAction.REPORT);
         }
     };
 
-    private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new 
ThreadLocal<CharsetDecoder>() {
+    private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY = new 
ThreadLocal<CharsetDecoder>() {
         protected CharsetDecoder initialValue() {
-            return Charset.forName("UTF-8").newDecoder()
+            return StandardCharsets.UTF_8.newDecoder()
                     .onMalformedInput(CodingErrorAction.REPORT)
                     .onUnmappableCharacter(CodingErrorAction.REPORT);
         }
@@ -85,7 +87,6 @@ public class Text implements Writable {
         set(utf8);
     }
 
-
     // Returns the raw bytes; however, only data up to getLength() is valid.
     public byte[] getBytes() {
         return bytes;
@@ -205,12 +206,9 @@ public class Text implements Writable {
     /**
      * Set the Text to range of bytes
      *
-     * @param utf8
-     *            the data to copy from
-     * @param start
-     *            the first position of the new string
-     * @param len
-     *            the number of bytes of the new string
+     * @param utf8 the data to copy from
+     * @param start the first position of the new string
+     * @param len the number of bytes of the new string
      */
     public void set(byte[] utf8, int start, int len) {
         setCapacity(len, false);
@@ -221,12 +219,9 @@ public class Text implements Writable {
     /**
      * Append a range of bytes to the end of the given text
      *
-     * @param utf8
-     *            the data to copy from
-     * @param start
-     *            the first position to append from utf8
-     * @param len
-     *            the number of bytes to append
+     * @param utf8 the data to copy from
+     * @param start the first position to append from utf8
+     * @param len the number of bytes to append
      */
     public void append(byte[] utf8, int start, int len) {
         setCapacity(length + len, true);
@@ -238,12 +233,9 @@ public class Text implements Writable {
      * Append a range of bytes to the end of the given text, and adjust
      * underlying buffer to reduce mem copy times
      *
-     * @param utf8
-     *            the data to copy from
-     * @param start
-     *            the first position to append from utf8
-     * @param len
-     *            the number of bytes to append
+     * @param utf8 the data to copy from
+     * @param start the first position to append from utf8
+     * @param len the number of bytes to append
      */
     public void appendAdjust(byte[] utf8, int start, int len) {
         int newLen = length + len;
@@ -413,8 +405,44 @@ public class Text implements Writable {
         int length = in.readInt();
         byte[] bytes = new byte[length];
         in.readFully(bytes, 0, length);
-        String res = decode(bytes);
-        return res;
+        if (Config.metadata_text_read_max_batch_bytes == -1) {
+            return decode(bytes);
+        } else {
+            // if the Config.metadata_image_module_load_batch_size != -1 will 
read bytes array and
+            // deserialize utf8 encode string in batch
+            int batchSize = 
Math.max(Config.metadata_text_read_max_batch_bytes, 16 * 1024 * 1024);
+            int offset = 0;
+            StringBuilder sb = new StringBuilder();
+            while (offset < length) {
+                int chunkSize = Math.min(batchSize, length - offset);
+                // the last chunkSize should not adjust the safe cut position
+                if (offset + chunkSize < length) {
+                    // find the safe cut position in utf8 encoded bytes
+                    chunkSize = findSafeCutPosition(bytes, offset, chunkSize);
+                }
+                sb.append(decode(bytes, offset, chunkSize));
+                offset += chunkSize;
+            }
+            return sb.toString();
+        }
+    }
+
+    private static int findSafeCutPosition(byte[] bytes, int start, int 
length) {
+        int end = start + length;
+        // Traverse backwards to find the last complete UTF-8 character
+        while (end > start) {
+            byte b = bytes[end - 1];
+            // Check if the byte is a continuation byte (10xxxxxx)
+            if ((b & 0xC0) == 0x80) {
+                // If it is a continuation byte, move to the previous byte
+                end--;
+            } else {
+                // If it is not a continuation byte, it is the start of a 
character
+                break;
+            }
+        }
+        // The safe length is the difference between the end - 1 position and 
the start position
+        return end - 1 - start;
     }
 
     /**
@@ -439,10 +467,8 @@ public class Text implements Writable {
     /**
      * Check if a byte array contains valid utf-8
      *
-     * @param utf8
-     *            byte array
-     * @throws MalformedInputException
-     *             if the byte array contains invalid utf-8
+     * @param utf8 byte array
+     * @throws MalformedInputException if the byte array contains invalid utf-8
      */
     public static void validateUTF8(byte[] utf8) throws 
MalformedInputException {
         validateUTF8(utf8, 0, utf8.length);
@@ -451,14 +477,10 @@ public class Text implements Writable {
     /**
      * Check to see if a byte array is valid utf-8
      *
-     * @param utf8
-     *            the array of bytes
-     * @param start
-     *            the offset of the first byte in the array
-     * @param len
-     *            the length of the byte sequence
-     * @throws MalformedInputException
-     *             if the byte array contains invalid bytes
+     * @param utf8 the array of bytes
+     * @param start the offset of the first byte in the array
+     * @param len the length of the byte sequence
+     * @throws MalformedInputException if the byte array contains invalid bytes
      */
     public static void validateUTF8(byte[] utf8, int start, int len)
             throws MalformedInputException {
@@ -540,7 +562,7 @@ public class Text implements Writable {
      * values 4 and 5 are presented in this table, even though valid UTF-8
      * cannot include the five and six byte sequences.
      */
-    static final int[] bytesFromUTF8 = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+    static final int[] bytesFromUTF8 = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
             0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
             0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
             0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
@@ -559,7 +581,7 @@ public class Text implements Writable {
             -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,
             1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
             1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3,
-            3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5 };
+            3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5};
 
     /**
      * Returns the next code point at the current position in the buffer. The
@@ -606,15 +628,14 @@ public class Text implements Writable {
         return ch;
     }
 
-    static final int[] offsetsFromUTF8 = { 0x00000000, 0x00003080, 0x000E2080,
-            0x03C82080, 0xFA082080, 0x82082080 };
+    static final int[] offsetsFromUTF8 = {0x00000000, 0x00003080, 0x000E2080,
+            0x03C82080, 0xFA082080, 0x82082080};
 
     /**
      * For the given string, returns the number of UTF-8 bytes required to
      * encode the string.
      *
-     * @param string
-     *            text to encode
+     * @param string text to encode
      * @return number of UTF-8 bytes required to encode
      */
     public static int utf8Length(String string) {
diff --git 
a/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java 
b/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java
new file mode 100644
index 00000000000..9a3adaab945
--- /dev/null
+++ b/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.io;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+public class TextTest {
+    static Stream<Arguments> provideTestData() {
+        return Stream.of(
+                Arguments.arguments("Hello".getBytes(StandardCharsets.UTF_8)),
+                Arguments.arguments("helloé".getBytes(StandardCharsets.UTF_8)),
+                
Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 * 
1024 * 16)),
+                
Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 * 
1024 * 16 + 1)),
+                
Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 * 
1024 * 32)),
+                
Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 
43214321)),
+                
Arguments.arguments("特殊\n\r\t字符".getBytes(StandardCharsets.UTF_8))
+        );
+    }
+
+    @ParameterizedTest(name = "[{index}] {arguments}")
+    @MethodSource("provideTestData")
+    void testReadString(byte[] inputBytes) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutput dataOutput = new DataOutputStream(bos);
+        dataOutput.writeInt(inputBytes.length);
+        dataOutput.write(inputBytes);
+        String result = Text.readString(new DataInputStream(new 
ByteArrayInputStream(bos.toByteArray())));
+        Assertions.assertEquals(new String(inputBytes, 
StandardCharsets.UTF_8), result);
+    }
+
+    private static byte[] createBytes(byte[] metaBytes, int scala) {
+        byte[] bytes = new byte[metaBytes.length * scala];
+        for (int i = 0; i < scala; i++) {
+            System.arraycopy(metaBytes, 0, bytes, i * metaBytes.length, 
metaBytes.length);
+        }
+        return bytes;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to