leekeiabstraction commented on code in PR #430:
URL: https://github.com/apache/fluss-rust/pull/430#discussion_r2900626203


##########
crates/examples/src/example_table.rs:
##########
@@ -15,6 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

Review Comment:
   I wonder if we should go this route, if I understand correctly, the RSS 
bloat comes from Rust side lacks memory pooling i.e. creating 
MemoryLogRecordsArrowBuilder and discarding per append. The proper fix for the 
issue would be to implement memory pooling on rust side as well.
   
   While I do like that this fixes the issue without implementing memory 
pooling immediately, think we should consider whether it's worth providing the 
advice to users around use of Jemalloc to mitigate the issue.
   
   I am not too opinionated, if there are strong signals for production use of 
0.1 client and performance is an issue, this is certainly a low hanging fruit 
that we can take.



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -151,6 +151,7 @@ pub const NO_BATCH_SEQUENCE: i32 = -1;
 
 pub const BUILDER_DEFAULT_OFFSET: i64 = 0;
 
+// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead 
of a hard record cap.
 pub const DEFAULT_MAX_RECORD: i32 = 256;

Review Comment:
   Java uses initial capacity of 1024, is there a motivation for using lower 
capacity here?
   
   Also, consider naming this initial capacity? My understanding is that this 
is not max records but initial capacity which arrow can grow



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -251,26 +253,47 @@ impl RowAppendRecordBatchBuilder {
         })
     }
 
-    fn create_builder(data_type: &arrow_schema::DataType) -> Result<Box<dyn 
ArrayBuilder>> {
+    fn create_builder(
+        data_type: &arrow_schema::DataType,
+        capacity: usize,
+    ) -> Result<Box<dyn ArrayBuilder>> {
         match data_type {
-            arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::new())),
-            arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::new())),
-            arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::new())),
-            arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::new())),
-            arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::new())),
-            arrow_schema::DataType::UInt16 => 
Ok(Box::new(UInt16Builder::new())),
-            arrow_schema::DataType::UInt32 => 
Ok(Box::new(UInt32Builder::new())),
-            arrow_schema::DataType::UInt64 => 
Ok(Box::new(UInt64Builder::new())),
-            arrow_schema::DataType::Float32 => 
Ok(Box::new(Float32Builder::new())),
-            arrow_schema::DataType::Float64 => 
Ok(Box::new(Float64Builder::new())),
-            arrow_schema::DataType::Boolean => 
Ok(Box::new(BooleanBuilder::new())),
-            arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
-            arrow_schema::DataType::Binary => 
Ok(Box::new(BinaryBuilder::new())),
-            arrow_schema::DataType::FixedSizeBinary(size) => {
-                Ok(Box::new(FixedSizeBinaryBuilder::new(*size)))
+            arrow_schema::DataType::Int8 => 
Ok(Box::new(Int8Builder::with_capacity(capacity))),
+            arrow_schema::DataType::Int16 => 
Ok(Box::new(Int16Builder::with_capacity(capacity))),
+            arrow_schema::DataType::Int32 => 
Ok(Box::new(Int32Builder::with_capacity(capacity))),
+            arrow_schema::DataType::Int64 => 
Ok(Box::new(Int64Builder::with_capacity(capacity))),
+            arrow_schema::DataType::UInt8 => 
Ok(Box::new(UInt8Builder::with_capacity(capacity))),
+            arrow_schema::DataType::UInt16 => 
Ok(Box::new(UInt16Builder::with_capacity(capacity))),
+            arrow_schema::DataType::UInt32 => 
Ok(Box::new(UInt32Builder::with_capacity(capacity))),
+            arrow_schema::DataType::UInt64 => 
Ok(Box::new(UInt64Builder::with_capacity(capacity))),
+            arrow_schema::DataType::Float32 => {
+                Ok(Box::new(Float32Builder::with_capacity(capacity)))
             }
+            arrow_schema::DataType::Float64 => {
+                Ok(Box::new(Float64Builder::with_capacity(capacity)))
+            }
+            arrow_schema::DataType::Boolean => {
+                Ok(Box::new(BooleanBuilder::with_capacity(capacity)))
+            }
+            arrow_schema::DataType::Utf8 => {
+                // Estimate 64 bytes average per string value
+                Ok(Box::new(StringBuilder::with_capacity(
+                    capacity,
+                    capacity * 64,
+                )))
+            }
+            arrow_schema::DataType::Binary => {
+                // Estimate 64 bytes average per binary value
+                Ok(Box::new(BinaryBuilder::with_capacity(
+                    capacity,
+                    capacity * 64,

Review Comment:
   nit: magic numbers, can define as a const
   
   Also, if we follow java side's BaseVariableWidthVector uses a multiplier of 
8.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to