This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 647e13eae07 [enhancement](image): load image supports reading in batches (#50183) 647e13eae07 is described below commit 647e13eae07bde9d42cb1d4f744351640485b1c8 Author: htyoung <hty551...@hotmail.com> AuthorDate: Sat Apr 26 14:55:36 2025 +0800 [enhancement](image): load image supports reading in batches (#50183) Text.readString supports reading in batches to avoid the situation where the size of delete info in image file becomes too large due to a long period without checkpointing, which could cause the method CharBuffer.allocate size to exceed the maximum value of Integer.MAX_VALUE and result in an overflow. --- .../main/java/org/apache/doris/common/Config.java | 13 +++ .../main/java/org/apache/doris/common/io/Text.java | 109 ++++++++++++--------- .../java/org/apache/doris/common/io/TextTest.java | 65 ++++++++++++ 3 files changed, 143 insertions(+), 44 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 082bbf95921..15d31a50935 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2883,6 +2883,19 @@ public class Config extends ConfigBase { }) public static int sync_image_timeout_second = 300; + @ConfField(mutable = true, description = { + "FE启动时加载image文件某个模块的二进制内容到字节数组,并将字节数组反序列化为utf8编码字符串时单批次(单位:byte, 至少500MB)" + + "的大小。等于-1的值表示一次性读取完整的字节数组后反序列化反序列化为utf8编码字符串;" + + "不等于-1的值(至少16MB)表示分批每次读取多大的字节数组后反序列化为utf8编码字符串,最后合并成完成的字符串。默认值为-1", + "The size of a single batch (in bytes) when loading the binary content of a module of the " + + "image file into a byte array and deserializing the byte array into a utf8 encoded string when FE starts." + + " A value equal to -1 means reading the entire byte array at once and " + + "then deserializing it into a utf8 encoded string; a value not equal to -1 means reading " + + "a certain size (at least 16MB) of byte array in batches and then deserializing it into a " + + "utf8 encoded string, and finally merging it into a completed string. The default value is -1" + }) + public static int metadata_text_read_max_batch_bytes = -1; + @ConfField(mutable = true, masterOnly = true) public static int publish_topic_info_interval_ms = 30000; // 30s diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java b/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java index 76256f30feb..50efafc55ab 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java @@ -17,6 +17,8 @@ package org.apache.doris.common.io; +import org.apache.doris.common.Config; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,11 +28,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.charset.CodingErrorAction; import java.nio.charset.MalformedInputException; +import java.nio.charset.StandardCharsets; import java.text.CharacterIterator; import java.text.StringCharacterIterator; @@ -48,17 +50,17 @@ import java.text.StringCharacterIterator; public class Text implements Writable { private static final Logger LOG = LoggerFactory.getLogger(Text.class); - private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() { + private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() { protected CharsetEncoder initialValue() { - return Charset.forName("UTF-8").newEncoder() + return StandardCharsets.UTF_8.newEncoder() .onMalformedInput(CodingErrorAction.REPORT) .onUnmappableCharacter(CodingErrorAction.REPORT); } }; - private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() { + private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() { protected CharsetDecoder initialValue() { - return Charset.forName("UTF-8").newDecoder() + return StandardCharsets.UTF_8.newDecoder() .onMalformedInput(CodingErrorAction.REPORT) .onUnmappableCharacter(CodingErrorAction.REPORT); } @@ -85,7 +87,6 @@ public class Text implements Writable { set(utf8); } - // Returns the raw bytes; however, only data up to getLength() is valid. public byte[] getBytes() { return bytes; @@ -205,12 +206,9 @@ public class Text implements Writable { /** * Set the Text to range of bytes * - * @param utf8 - * the data to copy from - * @param start - * the first position of the new string - * @param len - * the number of bytes of the new string + * @param utf8 the data to copy from + * @param start the first position of the new string + * @param len the number of bytes of the new string */ public void set(byte[] utf8, int start, int len) { setCapacity(len, false); @@ -221,12 +219,9 @@ public class Text implements Writable { /** * Append a range of bytes to the end of the given text * - * @param utf8 - * the data to copy from - * @param start - * the first position to append from utf8 - * @param len - * the number of bytes to append + * @param utf8 the data to copy from + * @param start the first position to append from utf8 + * @param len the number of bytes to append */ public void append(byte[] utf8, int start, int len) { setCapacity(length + len, true); @@ -238,12 +233,9 @@ public class Text implements Writable { * Append a range of bytes to the end of the given text, and adjust * underlying buffer to reduce mem copy times * - * @param utf8 - * the data to copy from - * @param start - * the first position to append from utf8 - * @param len - * the number of bytes to append + * @param utf8 the data to copy from + * @param start the first position to append from utf8 + * @param len the number of bytes to append */ public void appendAdjust(byte[] utf8, int start, int len) { int newLen = length + len; @@ -413,8 +405,44 @@ public class Text implements Writable { int length = in.readInt(); byte[] bytes = new byte[length]; in.readFully(bytes, 0, length); - String res = decode(bytes); - return res; + if (Config.metadata_text_read_max_batch_bytes == -1) { + return decode(bytes); + } else { + // if the Config.metadata_image_module_load_batch_size != -1 will read bytes array and + // deserialize utf8 encode string in batch + int batchSize = Math.max(Config.metadata_text_read_max_batch_bytes, 16 * 1024 * 1024); + int offset = 0; + StringBuilder sb = new StringBuilder(); + while (offset < length) { + int chunkSize = Math.min(batchSize, length - offset); + // the last chunkSize should not adjust the safe cut position + if (offset + chunkSize < length) { + // find the safe cut position in utf8 encoded bytes + chunkSize = findSafeCutPosition(bytes, offset, chunkSize); + } + sb.append(decode(bytes, offset, chunkSize)); + offset += chunkSize; + } + return sb.toString(); + } + } + + private static int findSafeCutPosition(byte[] bytes, int start, int length) { + int end = start + length; + // Traverse backwards to find the last complete UTF-8 character + while (end > start) { + byte b = bytes[end - 1]; + // Check if the byte is a continuation byte (10xxxxxx) + if ((b & 0xC0) == 0x80) { + // If it is a continuation byte, move to the previous byte + end--; + } else { + // If it is not a continuation byte, it is the start of a character + break; + } + } + // The safe length is the difference between the end - 1 position and the start position + return end - 1 - start; } /** @@ -439,10 +467,8 @@ public class Text implements Writable { /** * Check if a byte array contains valid utf-8 * - * @param utf8 - * byte array - * @throws MalformedInputException - * if the byte array contains invalid utf-8 + * @param utf8 byte array + * @throws MalformedInputException if the byte array contains invalid utf-8 */ public static void validateUTF8(byte[] utf8) throws MalformedInputException { validateUTF8(utf8, 0, utf8.length); @@ -451,14 +477,10 @@ public class Text implements Writable { /** * Check to see if a byte array is valid utf-8 * - * @param utf8 - * the array of bytes - * @param start - * the offset of the first byte in the array - * @param len - * the length of the byte sequence - * @throws MalformedInputException - * if the byte array contains invalid bytes + * @param utf8 the array of bytes + * @param start the offset of the first byte in the array + * @param len the length of the byte sequence + * @throws MalformedInputException if the byte array contains invalid bytes */ public static void validateUTF8(byte[] utf8, int start, int len) throws MalformedInputException { @@ -540,7 +562,7 @@ public class Text implements Writable { * values 4 and 5 are presented in this table, even though valid UTF-8 * cannot include the five and six byte sequences. */ - static final int[] bytesFromUTF8 = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + static final int[] bytesFromUTF8 = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -559,7 +581,7 @@ public class Text implements Writable { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, - 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5 }; + 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5}; /** * Returns the next code point at the current position in the buffer. The @@ -606,15 +628,14 @@ public class Text implements Writable { return ch; } - static final int[] offsetsFromUTF8 = { 0x00000000, 0x00003080, 0x000E2080, - 0x03C82080, 0xFA082080, 0x82082080 }; + static final int[] offsetsFromUTF8 = {0x00000000, 0x00003080, 0x000E2080, + 0x03C82080, 0xFA082080, 0x82082080}; /** * For the given string, returns the number of UTF-8 bytes required to * encode the string. * - * @param string - * text to encode + * @param string text to encode * @return number of UTF-8 bytes required to encode */ public static int utf8Length(String string) { diff --git a/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java b/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java new file mode 100644 index 00000000000..9a3adaab945 --- /dev/null +++ b/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.io; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.stream.Stream; + +public class TextTest { + static Stream<Arguments> provideTestData() { + return Stream.of( + Arguments.arguments("Hello".getBytes(StandardCharsets.UTF_8)), + Arguments.arguments("helloé".getBytes(StandardCharsets.UTF_8)), + Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 * 1024 * 16)), + Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 * 1024 * 16 + 1)), + Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 * 1024 * 32)), + Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 43214321)), + Arguments.arguments("特殊\n\r\t字符".getBytes(StandardCharsets.UTF_8)) + ); + } + + @ParameterizedTest(name = "[{index}] {arguments}") + @MethodSource("provideTestData") + void testReadString(byte[] inputBytes) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(bos); + dataOutput.writeInt(inputBytes.length); + dataOutput.write(inputBytes); + String result = Text.readString(new DataInputStream(new ByteArrayInputStream(bos.toByteArray()))); + Assertions.assertEquals(new String(inputBytes, StandardCharsets.UTF_8), result); + } + + private static byte[] createBytes(byte[] metaBytes, int scala) { + byte[] bytes = new byte[metaBytes.length * scala]; + for (int i = 0; i < scala; i++) { + System.arraycopy(metaBytes, 0, bytes, i * metaBytes.length, metaBytes.length); + } + return bytes; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org