darren2013 opened a new issue, #506: URL: https://github.com/apache/arrow-go/issues/506
### Describe the bug, including details regarding any error messages, version, and platform. > darren@darrendus-Mac-mini ~ % go tool pprof -text http://10.20.183.250:6060/debug/pprof/heap | head -30 Fetching profile over HTTP from http://10.20.183.250:6060/debug/pprof/heap Saved profile in /Users/darren/pprof/pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz File: log_fusion Build ID: aee9d00c9e5d647228e1f8922ddc80e2115f794f Type: inuse_space Time: 2025-09-12 19:04:55 CST Showing nodes accounting for 14863.66MB, 95.57% of 15552.03MB total Dropped 241 nodes (cum <= 77.76MB) flat flat% sum% cum cum% 3141.12MB 20.20% 20.20% 3141.12MB 20.20% github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewColumnMetaData (inline) 2053.06MB 13.20% 33.40% 2053.06MB 13.20% github.com/apache/arrow-go/v18/parquet/metadata.(*ColumnChunkMetaDataBuilder).Finish 2022.75MB 13.01% 46.41% 2022.75MB 13.01% github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet.NewStatistics (inline) 1683.18MB 10.82% 57.23% 6348.41MB 40.82% github.com/apache/arrow-go/v18/parquet/file.(*columnWriter).Close 1630.68MB 10.49% 67.71% 5779.84MB 37.16% github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupMetaDataBuilder).NextColumnChunk 1389.17MB 8.93% 76.65% 14417.33MB 92.70% github.com/apache/arrow-go/v18/parquet/file.(*rowGroupWriter).NextColumn 871.98MB 5.61% 82.25% 871.98MB 5.61% github.com/apache/arrow-go/v18/arrow/memory.(*GoAllocator).Allocate (partial-inline) 764.03MB 4.91% 87.17% 1008.04MB 6.48% github.com/apache/arrow-go/v18/parquet/metadata.NewColumnChunkMetaDataBuilderWithContents (inline) 344.82MB 2.22% 89.38% 344.82MB 2.22% bufio.NewWriterSize (inline) 244MB 1.57% 90.95% 244MB 1.57% github.com/apache/arrow-go/v18/parquet/schema.ColumnPathFromNode 215.05MB 1.38% 92.33% 269.10MB 1.73% github.com/apache/arrow-go/v18/parquet/pqarrow.writeDenseArrow 128.99MB 0.83% 93.16% 128.99MB 0.83% github.com/apache/arrow-go/v18/parquet/metadata.NewRowGroupMetaDataBuilder (inline) 97.67MB 0.63% 93.79% 97.67MB 0.63% reflect.mapassign_faststr0 89.58MB 0.58% 94.37% 89.58MB 0.58% github.com/apache/arrow-go/v18/internal/hashing.NewHashTable[go.shape.int32] (inline) 37.51MB 0.24% 94.61% 428.80MB 2.76% github.com/apache/arrow-go/v18/parquet/metadata.NewByteArrayStatistics 29.01MB 0.19% 94.80% 824.28MB 5.30% github.com/apache/arrow-go/v18/parquet/file.NewByteArrayColumnChunkWriter 29MB 0.19% 94.98% 179.68MB 1.16% encoding/json.(*decodeState).object 23.50MB 0.15% 95.13% 711.76MB 4.58% github.com/apache/arrow-go/v18/parquet/internal/encoding.byteArrayEncoderTraits.Encoder 16.50MB 0.11% 95.24% 14825.96MB 95.33% github.com/apache/arrow-go/v18/parquet/pqarrow.(*arrowColumnWriter).Write 16MB 0.1% 95.34% 701.82MB 4.51% github.com/apache/arrow-go/v18/parquet/internal/encoding.NewPooledBufferWriter (inline) 14.52MB 0.093% 95.44% 194.20MB 1.25% github.com/darren2013/log_fusion_source/internal/source.FileConsumer 7MB 0.045% 95.48% 351.82MB 2.26% github.com/apache/thrift/lib/go/thrift.NewStreamTransportW (inline) 5MB 0.032% 95.51% 95.58MB 0.61% github.com/apache/arrow-go/v18/internal/hashing.NewBinaryMemoTable the momry is increasing unitil there is no free memory to use the witie code as follows `func (p *ParquetProcessor) flushBatch(entities []model.Event) error { if len(entities) == 0 { return nil } // 检查Arrow Schema是否已初始化 if p.arrowSchema == nil { return fmt.Errorf("arrow schema not initialized") } // 检查Arrow写入器是否已初始化 if p.arrowWriter == nil { return fmt.Errorf("arrow writer not initialized") } // 当前时间 now := time.Now().UnixMilli() // 优化Arrow Record构建器的使用 // 每次创建新的RecordBuilder以确保内存正确释放 if p.recordBuilder != nil { p.recordBuilder.Release() p.recordBuilder = nil } p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema) // 按字段填充数据 for i, field := range p.arrowSchema.Fields() { fieldName := field.Name // timestamp字段特殊处理 if fieldName == "timestamp" { builder := p.recordBuilder.Field(i).(*array.Int64Builder) for j := 0; j < len(entities); j++ { builder.Append(now) } continue } // 根据字段类型填充数据 switch field.Type.ID() { case arrow.STRING: builder := p.recordBuilder.Field(i).(*array.StringBuilder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var strVal string switch v := val.(type) { case string: strVal = v default: strVal = fmt.Sprintf("%v", v) } builder.Append(strVal) } } case arrow.INT32: builder := p.recordBuilder.Field(i).(*array.Int32Builder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var intVal int32 switch v := val.(type) { case int: intVal = int32(v) case int64: intVal = int32(v) case float64: intVal = int32(v) case string: var f float64 if _, err := fmt.Sscanf(v, "%f", &f); err == nil { intVal = int32(f) } } builder.Append(intVal) } } case arrow.INT64: builder := p.recordBuilder.Field(i).(*array.Int64Builder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var int64Val int64 switch v := val.(type) { case int: int64Val = int64(v) case int64: int64Val = v case float64: int64Val = int64(v) case time.Time: int64Val = v.UnixMilli() case string: var f float64 if _, err := fmt.Sscanf(v, "%f", &f); err == nil { int64Val = int64(f) } else { // 尝试解析时间字符串 formats := []string{ time.RFC3339, "2006-01-02 15:04:05", "2006/01/02 15:04:05", "01/02/2006 15:04:05", "02/01/2006 15:04:05", } for _, format := range formats { if t, err := time.Parse(format, v); err == nil { int64Val = t.UnixMilli() break } } } } builder.Append(int64Val) } } case arrow.FLOAT64: builder := p.recordBuilder.Field(i).(*array.Float64Builder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var floatVal float64 switch v := val.(type) { case float64: floatVal = v case float32: floatVal = float64(v) case int: floatVal = float64(v) case int64: floatVal = float64(v) case string: fmt.Sscanf(v, "%f", &floatVal) } builder.Append(floatVal) } } case arrow.BOOL: builder := p.recordBuilder.Field(i).(*array.BooleanBuilder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var boolVal bool switch v := val.(type) { case bool: boolVal = v case string: switch v { case "true", "True", "TRUE", "1": boolVal = true case "false", "False", "FALSE", "0": boolVal = false } case int: boolVal = v != 0 case float64: boolVal = v != 0 } builder.Append(boolVal) } } default: log.Printf("警告: 字段 %s 的类型 %s 不支持", fieldName, field.Type.ID()) } } // 构建Arrow RecordBatch recordBatch := p.recordBuilder.NewRecordBatch() defer recordBatch.Release() // 使用Arrow Go v18最新的方式写入Parquet文件 if err := p.arrowWriter.Write(recordBatch); err != nil { return fmt.Errorf("failed to write record batch to parquet: %w", err) } // 更新写入计数器和时间 p.writeCount++ p.lastFlushTime = time.Now() p.currentRowGroupSize += int64(len(entities)) p.totalRecords += int64(len(entities)) // 温和的内存管理策略:记录写入统计信息 if p.writeCount%1000 == 0 { log.Printf("ParquetProcessor[%d] 已写入 %d 次,行组大小: %d,总记录: %d", p.instanceID, p.writeCount, p.currentRowGroupSize, p.totalRecords) } // 内存优化策略:通过控制批处理大小来减少内存压力 if len(entities) > 1000 { log.Printf("ParquetProcessor[%d] 警告:批处理大小较大 (%d),可能增加内存压力", p.instanceID, len(entities)) } // 🔥 核心内存优化:智能批处理级别的C++资源释放 // 每10批数据或每1000条记录后释放C++对象资源,平衡性能和内存 if p.writeCount%10 == 0 || len(entities) >= 1000 { if err := p.forceCppMemoryRelease(); err != nil { log.Printf("ParquetProcessor[%d] C++资源释放失败: %v", p.instanceID, err) // 不返回错误,继续处理后续数据 } } // 层级2: 每10万条记录进行深度内存释放(文件分段)- 降低阈值以防止内存溢出 /*if p.totalRecords > 0 && p.totalRecords%100000 == 0 { log.Printf("ParquetProcessor[%d] 总记录数达到 %d,开始深度内存释放", p.instanceID, p.totalRecords) if err := p.forceRowGroupFlush(); err != nil { log.Printf("ParquetProcessor[%d] 深度内存释放失败: %v", p.instanceID, err) } else { p.currentRowGroupSize = 0 // 重置行组计数器 log.Printf("ParquetProcessor[%d] 深度内存释放成功,新文件已创建", p.instanceID) } }*/ // 调用TotalCounter.CountN方法,记录成功写入的记录数 (*p.TotalCounter).CountN(int64(len(entities))) return nil }` ### Component(s) Parquet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
