darren2013 opened a new issue, #506:
URL: https://github.com/apache/arrow-go/issues/506

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   > darren@darrendus-Mac-mini ~ % go tool pprof -text 
http://10.20.183.250:6060/debug/pprof/heap | head -30
   Fetching profile over HTTP from http://10.20.183.250:6060/debug/pprof/heap
   Saved profile in 
/Users/darren/pprof/pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz
   File: log_fusion
   Build ID: aee9d00c9e5d647228e1f8922ddc80e2115f794f
   Type: inuse_space
   Time: 2025-09-12 19:04:55 CST
   Showing nodes accounting for 14863.66MB, 95.57% of 15552.03MB total
   Dropped 241 nodes (cum <= 77.76MB)
         flat  flat%   sum%        cum   cum%
    3141.12MB 20.20% 20.20%  3141.12MB 20.20%  
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewColumnMetaData
 (inline)
    2053.06MB 13.20% 33.40%  2053.06MB 13.20%  
github.com/apache/arrow-go/v18/parquet/metadata.(*ColumnChunkMetaDataBuilder).Finish
    2022.75MB 13.01% 46.41%  2022.75MB 13.01%  
github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewStatistics 
(inline)
    1683.18MB 10.82% 57.23%  6348.41MB 40.82%  
github.com/apache/arrow-go/v18/parquet/file.(*columnWriter).Close
    1630.68MB 10.49% 67.71%  5779.84MB 37.16%  
github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupMetaDataBuilder).NextColumnChunk
    1389.17MB  8.93% 76.65% 14417.33MB 92.70%  
github.com/apache/arrow-go/v18/parquet/file.(*rowGroupWriter).NextColumn
     871.98MB  5.61% 82.25%   871.98MB  5.61%  
github.com/apache/arrow-go/v18/arrow/memory.(*GoAllocator).Allocate 
(partial-inline)
     764.03MB  4.91% 87.17%  1008.04MB  6.48%  
github.com/apache/arrow-go/v18/parquet/metadata.NewColumnChunkMetaDataBuilderWithContents
 (inline)
     344.82MB  2.22% 89.38%   344.82MB  2.22%  bufio.NewWriterSize (inline)
        244MB  1.57% 90.95%      244MB  1.57%  
github.com/apache/arrow-go/v18/parquet/schema.ColumnPathFromNode
     215.05MB  1.38% 92.33%   269.10MB  1.73%  
github.com/apache/arrow-go/v18/parquet/pqarrow.writeDenseArrow
     128.99MB  0.83% 93.16%   128.99MB  0.83%  
github.com/apache/arrow-go/v18/parquet/metadata.NewRowGroupMetaDataBuilder 
(inline)
      97.67MB  0.63% 93.79%    97.67MB  0.63%  reflect.mapassign_faststr0
      89.58MB  0.58% 94.37%    89.58MB  0.58%  
github.com/apache/arrow-go/v18/internal/hashing.NewHashTable[go.shape.int32] 
(inline)
      37.51MB  0.24% 94.61%   428.80MB  2.76%  
github.com/apache/arrow-go/v18/parquet/metadata.NewByteArrayStatistics
      29.01MB  0.19% 94.80%   824.28MB  5.30%  
github.com/apache/arrow-go/v18/parquet/file.NewByteArrayColumnChunkWriter
         29MB  0.19% 94.98%   179.68MB  1.16%  
encoding/json.(*decodeState).object
      23.50MB  0.15% 95.13%   711.76MB  4.58%  
github.com/apache/arrow-go/v18/parquet/internal/encoding.byteArrayEncoderTraits.Encoder
      16.50MB  0.11% 95.24% 14825.96MB 95.33%  
github.com/apache/arrow-go/v18/parquet/pqarrow.(*arrowColumnWriter).Write
         16MB   0.1% 95.34%   701.82MB  4.51%  
github.com/apache/arrow-go/v18/parquet/internal/encoding.NewPooledBufferWriter 
(inline)
      14.52MB 0.093% 95.44%   194.20MB  1.25%  
github.com/darren2013/log_fusion_source/internal/source.FileConsumer
          7MB 0.045% 95.48%   351.82MB  2.26%  
github.com/apache/thrift/lib/go/thrift.NewStreamTransportW (inline)
          5MB 0.032% 95.51%    95.58MB  0.61%  
github.com/apache/arrow-go/v18/internal/hashing.NewBinaryMemoTable
   
   
   the momry is increasing unitil there is no free memory to use 
   
   
   the witie code as follows
   `func (p *ParquetProcessor) flushBatch(entities []model.Event) error {
        if len(entities) == 0 {
                return nil
        }
   
        // 检查Arrow Schema是否已初始化
        if p.arrowSchema == nil {
                return fmt.Errorf("arrow schema not initialized")
        }
   
        // 检查Arrow写入器是否已初始化
        if p.arrowWriter == nil {
                return fmt.Errorf("arrow writer not initialized")
        }
   
        // 当前时间
        now := time.Now().UnixMilli()
   
        // 优化Arrow Record构建器的使用
        // 每次创建新的RecordBuilder以确保内存正确释放
        if p.recordBuilder != nil {
                p.recordBuilder.Release()
                p.recordBuilder = nil
        }
        p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema)
   
        // 按字段填充数据
        for i, field := range p.arrowSchema.Fields() {
                fieldName := field.Name
   
                // timestamp字段特殊处理
                if fieldName == "timestamp" {
                        builder := 
p.recordBuilder.Field(i).(*array.Int64Builder)
                        for j := 0; j < len(entities); j++ {
                                builder.Append(now)
                        }
                        continue
                }
   
                // 根据字段类型填充数据
                switch field.Type.ID() {
                case arrow.STRING:
                        builder := 
p.recordBuilder.Field(i).(*array.StringBuilder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var strVal string
                                        switch v := val.(type) {
                                        case string:
                                                strVal = v
                                        default:
                                                strVal = fmt.Sprintf("%v", v)
                                        }
                                        builder.Append(strVal)
                                }
                        }
   
                case arrow.INT32:
                        builder := 
p.recordBuilder.Field(i).(*array.Int32Builder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var intVal int32
                                        switch v := val.(type) {
                                        case int:
                                                intVal = int32(v)
                                        case int64:
                                                intVal = int32(v)
                                        case float64:
                                                intVal = int32(v)
                                        case string:
                                                var f float64
                                                if _, err := fmt.Sscanf(v, 
"%f", &f); err == nil {
                                                        intVal = int32(f)
                                                }
                                        }
                                        builder.Append(intVal)
                                }
                        }
   
                case arrow.INT64:
                        builder := 
p.recordBuilder.Field(i).(*array.Int64Builder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var int64Val int64
                                        switch v := val.(type) {
                                        case int:
                                                int64Val = int64(v)
                                        case int64:
                                                int64Val = v
                                        case float64:
                                                int64Val = int64(v)
                                        case time.Time:
                                                int64Val = v.UnixMilli()
                                        case string:
                                                var f float64
                                                if _, err := fmt.Sscanf(v, 
"%f", &f); err == nil {
                                                        int64Val = int64(f)
                                                } else {
                                                        // 尝试解析时间字符串
                                                        formats := []string{
                                                                time.RFC3339,
                                                                "2006-01-02 
15:04:05",
                                                                "2006/01/02 
15:04:05",
                                                                "01/02/2006 
15:04:05",
                                                                "02/01/2006 
15:04:05",
                                                        }
                                                        for _, format := range 
formats {
                                                                if t, err := 
time.Parse(format, v); err == nil {
                                                                        
int64Val = t.UnixMilli()
                                                                        break
                                                                }
                                                        }
                                                }
                                        }
                                        builder.Append(int64Val)
                                }
                        }
   
                case arrow.FLOAT64:
                        builder := 
p.recordBuilder.Field(i).(*array.Float64Builder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var floatVal float64
                                        switch v := val.(type) {
                                        case float64:
                                                floatVal = v
                                        case float32:
                                                floatVal = float64(v)
                                        case int:
                                                floatVal = float64(v)
                                        case int64:
                                                floatVal = float64(v)
                                        case string:
                                                fmt.Sscanf(v, "%f", &floatVal)
                                        }
                                        builder.Append(floatVal)
                                }
                        }
   
                case arrow.BOOL:
                        builder := 
p.recordBuilder.Field(i).(*array.BooleanBuilder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var boolVal bool
                                        switch v := val.(type) {
                                        case bool:
                                                boolVal = v
                                        case string:
                                                switch v {
                                                case "true", "True", "TRUE", 
"1":
                                                        boolVal = true
                                                case "false", "False", "FALSE", 
"0":
                                                        boolVal = false
                                                }
                                        case int:
                                                boolVal = v != 0
                                        case float64:
                                                boolVal = v != 0
                                        }
                                        builder.Append(boolVal)
                                }
                        }
   
                default:
                        log.Printf("警告: 字段 %s 的类型 %s 不支持", fieldName, 
field.Type.ID())
                }
        }
   
        // 构建Arrow RecordBatch
        recordBatch := p.recordBuilder.NewRecordBatch()
        defer recordBatch.Release()
   
        // 使用Arrow Go v18最新的方式写入Parquet文件
        if err := p.arrowWriter.Write(recordBatch); err != nil {
                return fmt.Errorf("failed to write record batch to parquet: 
%w", err)
        }
   
        // 更新写入计数器和时间
        p.writeCount++
        p.lastFlushTime = time.Now()
        p.currentRowGroupSize += int64(len(entities))
        p.totalRecords += int64(len(entities))
   
        // 温和的内存管理策略:记录写入统计信息
        if p.writeCount%1000 == 0 {
                log.Printf("ParquetProcessor[%d] 已写入 %d 次,行组大小: %d,总记录: %d",
                        p.instanceID, p.writeCount, p.currentRowGroupSize, 
p.totalRecords)
        }
   
        // 内存优化策略:通过控制批处理大小来减少内存压力
        if len(entities) > 1000 {
                log.Printf("ParquetProcessor[%d] 警告:批处理大小较大 (%d),可能增加内存压力", 
p.instanceID, len(entities))
        }
   
        // 🔥 核心内存优化:智能批处理级别的C++资源释放
        // 每10批数据或每1000条记录后释放C++对象资源,平衡性能和内存
        if p.writeCount%10 == 0 || len(entities) >= 1000 {
                if err := p.forceCppMemoryRelease(); err != nil {
                        log.Printf("ParquetProcessor[%d] C++资源释放失败: %v", 
p.instanceID, err)
                        // 不返回错误,继续处理后续数据
                }
        }
   
        // 层级2: 每10万条记录进行深度内存释放(文件分段)- 降低阈值以防止内存溢出
        /*if p.totalRecords > 0 && p.totalRecords%100000 == 0 {
                log.Printf("ParquetProcessor[%d] 总记录数达到 %d,开始深度内存释放", 
p.instanceID, p.totalRecords)
   
                if err := p.forceRowGroupFlush(); err != nil {
                        log.Printf("ParquetProcessor[%d] 深度内存释放失败: %v", 
p.instanceID, err)
                } else {
                        p.currentRowGroupSize = 0 // 重置行组计数器
                        log.Printf("ParquetProcessor[%d] 深度内存释放成功,新文件已创建", 
p.instanceID)
                }
        }*/
   
        // 调用TotalCounter.CountN方法,记录成功写入的记录数
        (*p.TotalCounter).CountN(int64(len(entities)))
   
        return nil
   }`
   
   ### Component(s)
   
   Parquet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to