This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-streamloader.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a3664b  fix streamloader use too much memory (#19)
3a3664b is described below

commit 3a3664b5fb62bbe7259b8084e41a147d770bddef
Author: hui lai <1353307...@qq.com>
AuthorDate: Thu Jun 27 15:57:15 2024 +0800

    fix streamloader use too much memory (#19)
---
 main.go | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/main.go b/main.go
index 1677b4f..7d08378 100644
--- a/main.go
+++ b/main.go
@@ -38,7 +38,7 @@ import (
 const (
        fileBufferSize              = 16 * 1024 * 1024 // 16MB
        bufferSize                  = 1 * 1024 * 1024  // 1MB
-       queueSize                   = 50 * 1024 * 1024 //50MB
+       defaultQueueSize            = 100
        defaultTimeout              = 60 * 60 * 10
        defaultBatchRows            = 4096
        defaultBatchBytes           = 943718400
@@ -80,6 +80,7 @@ var (
        retryInterval        int
        retryInfo            map[int]int
        showVersion          bool
+       queueSize            int
 
        bufferPool = sync.Pool{
                New: func() interface{} {
@@ -114,6 +115,7 @@ func initFlags() {
        flag.IntVar(&retryInterval, "auto_retry_interval", 
defaultRetryInterval, "retry failure")
        flag.BoolVar(&debug, "debug", false, "enable debug")
        flag.BoolVar(&showVersion, "version", false, "Display the version")
+       flag.IntVar(&queueSize, "queue_size", defaultQueueSize, "memory queue 
size")
 
        flag.Parse()
 
@@ -183,6 +185,7 @@ func initFlags() {
                fmt.Println("retry_info: ", retry)
                fmt.Println("retry_times: ", maxRetryTimes)
                fmt.Println("retry_interval: ", retryInterval)
+               fmt.Println("queue_size: ", queueSize)
        }
 
        utils.InitLog(logLevel)
@@ -291,6 +294,11 @@ func paramCheck() {
                log.Warnf("retryInterval invalid: %d, replace with default 
value: %d", retryInterval, defaultRetryInterval)
                retryInterval = defaultRetryInterval
        }
+
+       if queueSize <= 0 {
+               log.Warnf("queueSize invalid: %d, replace with default value: 
%d", queueSize, defaultQueueSize)
+               queueSize = defaultQueueSize
+       }
 }
 
 func calculateAndCheckWorkers(reader *file.FileReader, size int64) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to