fresh-borzoni commented on code in PR #449:
URL: https://github.com/apache/fluss-rust/pull/449#discussion_r3025241226


##########
crates/fluss/src/config.rs:
##########
@@ -350,6 +350,39 @@ impl Config {
         }
         Ok(())
     }
+    pub fn validate_numeric_fields(&self) -> Result<(), String> {
+        if self.writer_request_max_size <= 0 {
+            return Err("writer_request_max_size must be > 0".to_string());
+        }
+        if self.writer_batch_size <= 0 {
+            return Err("writer_batch_size must be > 0".to_string());
+        }
+        if self.writer_batch_timeout_ms < 0 {
+            return Err("writer_batch_timeout_ms must be >= 0".to_string());
+        }
+        if self.remote_file_download_thread_num == 0 {
+            return Err("remote_file_download_thread_num must be > 
0".to_string());
+        }
+        if self.scanner_remote_log_prefetch_num == 0 {
+            return Err("scanner_remote_log_prefetch_num must be > 
0".to_string());
+        }
+        if self.scanner_remote_log_read_concurrency == 0 {

Review Comment:
   We have .max(1) in remote_log.rs, let's remove it from there



##########
crates/fluss/src/config.rs:
##########
@@ -350,6 +350,39 @@ impl Config {
         }
         Ok(())
     }
+    pub fn validate_numeric_fields(&self) -> Result<(), String> {
+        if self.writer_request_max_size <= 0 {
+            return Err("writer_request_max_size must be > 0".to_string());
+        }
+        if self.writer_batch_size <= 0 {
+            return Err("writer_batch_size must be > 0".to_string());
+        }
+        if self.writer_batch_timeout_ms < 0 {
+            return Err("writer_batch_timeout_ms must be >= 0".to_string());
+        }
+        if self.remote_file_download_thread_num == 0 {
+            return Err("remote_file_download_thread_num must be > 
0".to_string());
+        }
+        if self.scanner_remote_log_prefetch_num == 0 {
+            return Err("scanner_remote_log_prefetch_num must be > 
0".to_string());
+        }
+        if self.scanner_remote_log_read_concurrency == 0 {
+            return Err("scanner_remote_log_read_concurrency must be > 
0".to_string());
+        }
+        if self.scanner_log_max_poll_records == 0 {

Review Comment:
   in Java it's allowed, though arguably makes no sense - we need to match, 
let's drop this for now and file an issue in main repo.
   The same for connect_timeout_ms



##########
crates/fluss/src/config.rs:
##########
@@ -350,6 +350,39 @@ impl Config {
         }
         Ok(())
     }
+    pub fn validate_numeric_fields(&self) -> Result<(), String> {
+        if self.writer_request_max_size <= 0 {
+            return Err("writer_request_max_size must be > 0".to_string());
+        }
+        if self.writer_batch_size <= 0 {
+            return Err("writer_batch_size must be > 0".to_string());
+        }
+        if self.writer_batch_timeout_ms < 0 {
+            return Err("writer_batch_timeout_ms must be >= 0".to_string());
+        }
+        if self.remote_file_download_thread_num == 0 {
+            return Err("remote_file_download_thread_num must be > 
0".to_string());
+        }
+        if self.scanner_remote_log_prefetch_num == 0 {
+            return Err("scanner_remote_log_prefetch_num must be > 
0".to_string());
+        }
+        if self.scanner_remote_log_read_concurrency == 0 {
+            return Err("scanner_remote_log_read_concurrency must be > 
0".to_string());
+        }
+        if self.scanner_log_max_poll_records == 0 {
+            return Err("scanner_log_max_poll_records must be > 0".to_string());
+        }
+        if self.writer_max_inflight_requests_per_bucket == 0 {
+            return Err("writer_max_inflight_requests_per_bucket must be > 
0".to_string());
+        }
+        if self.writer_buffer_memory_size == 0 {
+            return Err("writer_buffer_memory_size must be > 0".to_string());
+        }
+        if self.connect_timeout_ms == 0 {
+            return Err("connect_timeout_ms must be > 0".to_string());
+        }
+        Ok(())

Review Comment:
   we need to have these checks as well:
    - writer_batch_size <= writer_request_max_size -  otherwise batches never 
drain as they exceed max size defined for request                               
                                                                                
                                               
    - writer_batch_size <= writer_buffer_memory_size  - or If a single full 
batch exceeds max_size, the drain loop can never pick it up and send



##########
crates/fluss/src/client/connection.rs:
##########
@@ -45,6 +45,8 @@ impl FlussConnection {
             .map_err(|msg| Error::IllegalArgument { message: msg })?;
         arg.validate_scanner_fetch()
             .map_err(|msg| Error::IllegalArgument { message: msg })?;
+        arg.validate_numeric_fields()

Review Comment:
   let's structure this better: 
   * validate_scanner - to include all scanner related configs
   * validate_writer - to inclide writer configs and idempotence checks
   * validate_security - sasl checks
   
   so that it's more scoped and it's harder to miss smth



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to