pratik-tt opened a new issue, #360: URL: https://github.com/apache/arrow-go/issues/360
### Describe the bug, including details regarding any error messages, version, and platform. ```parquet version: 2 num of rows: 100 created by: wal-cake message schema { REQUIRED BYTE_ARRAY table; REQUIRED BYTE_ARRAY operation; REQUIRED INT64 timestamp; REQUIRED INT64 lsn; REQUIRED BYTE_ARRAY data_json; } num of row groups: 1 row groups: row group 0: -------------------------------------------------------------------------------- total byte size: 122632 num of rows: 100 num of columns: 5 columns: column 0: -------------------------------------------------------------------------------- column type: BYTE_ARRAY column path: "table" encodings: RLE_DICTIONARY PLAIN RLE file path: N/A file offset: 116 num of values: 100 compression: ZSTD(ZstdLevel(1)) total compressed size (in bytes): 112 total uncompressed size (in bytes): 86 data page offset: 47 index page offset: N/A dictionary page offset: 4 ``` ```go // ParquetWriter interface defines methods for writing CDC events to Parquet format type ParquetWriter interface { // WriteToBuffer writes events to an in-memory buffer and returns the bytes WriteToBuffer(events []*model.CDCEvent) ([]byte, error) GetCompressionCodec() string // AddFilter adds an event filter to the writer AddFilter(filter EventFilter) } type parquetWriter struct { props *parquet.WriterProperties filters []EventFilter } // writerTell is a wrapper that implements io.Writer and has a Tell method type writerTell struct { w io.Writer pos int64 } func (w *writerTell) Write(p []byte) (int, error) { n, err := w.w.Write(p) w.pos += int64(n) return n, err } func (w *writerTell) Tell() int64 { return w.pos } // NewParquetWriter creates a new Parquet writer with ZSTD compression func NewParquetWriter() ParquetWriter { // Create writer properties with ZSTD compression props := parquet.NewWriterProperties( parquet.WithCompression(compress.Codecs.Zstd), // THIS PART WHERE WE ARE DEFINING LEVEL parquet.WithCompressionLevel(3), parquet.WithDictionaryDefault(true), parquet.WithStats(true), parquet.WithCreatedBy("wal-cake"), ) return &parquetWriter{ props: props, filters: make([]EventFilter, 0), } } func (w *parquetWriter) GetCompressionCodec() string { return w.props.Compression().String() } // AddFilter adds an event filter to the writer func (w *parquetWriter) AddFilter(filter EventFilter) { w.filters = append(w.filters, filter) } // createSchema creates the Parquet schema for CDC events func (w *parquetWriter) createSchema() *schema.Schema { // Create schema nodes for each column tableNode, err := schema.NewPrimitiveNode("table", parquet.Repetitions.Required, parquet.Types.ByteArray, -1, -1) if err != nil { log.Fatal().Err(err).Msg("create table schema node") } opNode, err := schema.NewPrimitiveNode("operation", parquet.Repetitions.Required, parquet.Types.ByteArray, -1, -1) if err != nil { log.Fatal().Err(err).Msg("create operation schema node") } tsNode, err := schema.NewPrimitiveNode("timestamp", parquet.Repetitions.Required, parquet.Types.Int64, -1, -1) if err != nil { log.Fatal().Err(err).Msg("create timestamp schema node") } lsnNode, err := schema.NewPrimitiveNode("lsn", parquet.Repetitions.Required, parquet.Types.Int64, -1, -1) if err != nil { log.Fatal().Err(err).Msg("create lsn schema node") } dataNode, err := schema.NewPrimitiveNode("data_json", parquet.Repetitions.Required, parquet.Types.ByteArray, -1, -1) if err != nil { log.Fatal().Err(err).Msg("create data_json schema node") } // Create schema fields := []schema.Node{tableNode, opNode, tsNode, lsnNode, dataNode} root, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, -1) if err != nil { log.Fatal().Err(err).Msg("create schema") } return schema.NewSchema(root) } // writeEventsToParquet writes the given events to a Parquet file, applying filters func (w *parquetWriter) writeEventsToParquet(events []*model.CDCEvent, writer io.Writer) error { // First count how many events will pass the filter validCount := 0 for _, ev := range events { if w.shouldIncludeEvent(ev) { validCount++ } } // Log filter statistics if validCount < len(events) { log.Info().Int("total_events", len(events)).Int("filtered_events", len(events)-validCount).Int("written_events", validCount).Msg("filtered events before parquet writing") } // If no events after filtering, return early if validCount == 0 { log.Warn().Msg("no events to write after filtering") return nil } // Create a writer that can tell its position wt := &writerTell{w: writer} // Create parquet writer with schema schema := w.createSchema() // Create parquet file writer fileWriter := file.NewParquetWriter(wt, schema.Root(), file.WithWriterProps(w.props)) // Create row group with a reasonable size rg := fileWriter.AppendRowGroup() // Prepare column data arrays once with the exact size needed tableData := make([]parquet.ByteArray, validCount) opData := make([]parquet.ByteArray, validCount) tsData := make([]int64, validCount) lsnData := make([]int64, validCount) dataJsonValues := make([]parquet.ByteArray, validCount) // Fill column data arrays in a single pass index := 0 for _, ev := range events { if !w.shouldIncludeEvent(ev) { continue } // Table column tableData[index] = []byte(ev.Table) // Operation column opData[index] = []byte(ev.Operation) // Timestamp column tsData[index] = ev.Timestamp.UnixNano() / int64(time.Millisecond) // LSN column lsnData[index] = int64(ev.LSN) // Data JSON column jsonData, err := json.Marshal(ev.Data) if err != nil { return fmt.Errorf("marshal data to JSON: %w", err) } dataJsonValues[index] = jsonData index++ } // Write table column tableWriter, err := rg.NextColumn() if err != nil { return fmt.Errorf("next column: %w", err) } byteArrayWriter := tableWriter.(*file.ByteArrayColumnChunkWriter) _, err = byteArrayWriter.WriteBatch(tableData, nil, nil) if err != nil { return fmt.Errorf("write table column: %w", err) } // Write operation column opWriter, err := rg.NextColumn() if err != nil { return fmt.Errorf("next column: %w", err) } opByteArrayWriter := opWriter.(*file.ByteArrayColumnChunkWriter) _, err = opByteArrayWriter.WriteBatch(opData, nil, nil) if err != nil { return fmt.Errorf("write operation column: %w", err) } // Write timestamp column tsWriter, err := rg.NextColumn() if err != nil { return fmt.Errorf("next column: %w", err) } tsInt64Writer := tsWriter.(*file.Int64ColumnChunkWriter) _, err = tsInt64Writer.WriteBatch(tsData, nil, nil) if err != nil { return fmt.Errorf("write timestamp column: %w", err) } // Write LSN column lsnWriter, err := rg.NextColumn() if err != nil { return fmt.Errorf("next column: %w", err) } lsnInt64Writer := lsnWriter.(*file.Int64ColumnChunkWriter) _, err = lsnInt64Writer.WriteBatch(lsnData, nil, nil) if err != nil { return fmt.Errorf("write lsn column: %w", err) } // Write data JSON column dataWriter, err := rg.NextColumn() if err != nil { return fmt.Errorf("next column: %w", err) } dataByteArrayWriter := dataWriter.(*file.ByteArrayColumnChunkWriter) _, err = dataByteArrayWriter.WriteBatch(dataJsonValues, nil, nil) if err != nil { return fmt.Errorf("write data column: %w", err) } // Close the row group if err := rg.Close(); err != nil { return fmt.Errorf("close row group: %w", err) } // Close the file writer if err := fileWriter.Close(); err != nil { return fmt.Errorf("close file writer: %w", err) } return nil } // shouldIncludeEvent applies all filters to determine if an event should be included func (w *parquetWriter) shouldIncludeEvent(event *model.CDCEvent) bool { for _, filter := range w.filters { if !filter(event) { return false } } return true } // WriteToBuffer writes the given events to an in-memory buffer and returns the bytes. func (w *parquetWriter) WriteToBuffer(events []*model.CDCEvent) ([]byte, error) { log.Info().Int("events", len(events)).Msg("writing CDC events to in-memory parquet buffer") // Create an in-memory buffer buf := new(bytes.Buffer) // Write events to the buffer (filtering happens during writing) if err := w.writeEventsToParquet(events, buf); err != nil { return nil, err } log.Info().Int("size", buf.Len()).Msg("successfully wrote CDC events to in-memory parquet buffer") return buf.Bytes(), nil } ``` Thank you for building arrow-go/ it's super cool, advance thank you for looking at this issue and any help. ### Component(s) Parquet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org