laskoviymishka commented on code in PR #735:
URL: https://github.com/apache/iceberg-go/pull/735#discussion_r2878121065


##########
manifest.go:
##########
@@ -1445,32 +1470,40 @@ func (m *ManifestListWriter) AddManifests(files 
[]ManifestFile) error {
 }
 
 // WriteManifestList writes a list of manifest files to an avro file.
-func WriteManifestList(version int, out io.Writer, snapshotID int64, 
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files 
[]ManifestFile) (err error) {
+// For v3 tables, it returns the next row ID after assigning first_row_id to 
data manifests; otherwise 0.
+func WriteManifestList(version int, out io.Writer, snapshotID int64, 
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files 
[]ManifestFile) (nextRowIDAfter int64, err error) {

Review Comment:
   this is public function, i would prefer to keep this function as is and make 
separate WriteManifestV3, which returns more args



##########
manifest.go:
##########
@@ -520,6 +524,9 @@ type ManifestFile interface {
        // field in the spec. Each field in the list corresponds to a field in
        // the manifest file's partition spec.
        Partitions() []FieldSummary
+       // FirstRowId returns the first _row_id assigned to rows in this 
manifest (v3+ data manifests only).
+       // Returns nil for v1/v2 or for delete manifests.
+       FirstRowId() *int64

Review Comment:
   FirstRowId --> FirstRowID 
   
   since this is a public API changing this would be very hard thing, and we 
have already other IDs as ID, not Id.



##########
table/scanner.go:
##########
@@ -466,12 +466,21 @@ func (scan *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
                if err != nil {
                        return nil, err
                }
-               results = append(results, FileScanTask{
+               task := FileScanTask{
                        File:        e.DataFile(),
                        DeleteFiles: deleteFiles,
                        Start:       0,
                        Length:      e.DataFile().FileSizeBytes(),
-               })
+               }
+               // Row lineage constants for v3: readers use these to 
synthesize _row_id and _last_updated_sequence_number.
+               if scan.metadata.Version() >= 3 {
+                       task.FirstRowID = e.DataFile().FirstRowID()
+                       seq := e.SequenceNum()

Review Comment:
   i think this should be FileSequenceNum, not SequenceNum
   
   smth like:
   
   ```go
     if fseq := e.FileSequenceNum(); fseq != nil {
         task.DataSequenceNumber = fseq
     }
   ```



##########
table/arrow_scanner.go:
##########
@@ -340,6 +340,80 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+       _ *iceberg.Schema,

Review Comment:
   why does this needed?



##########
manifest.go:
##########
@@ -595,6 +602,11 @@ type ManifestReader struct {
        schemaLoaded        bool
        partitionSpec       PartitionSpec
        partitionSpecLoaded bool
+
+       // nextFirstRowID tracks the next first_row_id to assign when reading 
v3 data
+       // manifests; used for First Row ID inheritance (null data file 
first_row_id
+       // gets manifest's first_row_id + sum of preceding null-first_row_id 
files' record_count).
+       nextFirstRowID *int64

Review Comment:
   i think this *int64 optional pattern make whole code a much more cumbersome, 
can we fallback to either just int64, or a define pair of int64 + bool?
   
   The *int64 pattern does make sense in the rest of the codebase for 
Avro-serialized fields that need to round-trip as nullable (e.g. 
FirstRowIDField *int64 on dataFile). But for a private tracking variable inside 
ManifestReader, it's the wrong tool, you want a clear "am I even doing this?" 
flag, not pointer indirection.



##########
table/arrow_scanner.go:
##########
@@ -340,6 +340,80 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, 
fileSchema *iceberg.Sc
        return nil, false, nil
 }
 
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number 
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema). 
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited 
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is 
kept.
+// rowOffset is the 0-based row index within the current file and is updated 
so _row_id stays
+// correct across multiple batches from the same file (first_row_id + 
row_position).
+func synthesizeRowLineageColumns(
+       ctx context.Context,
+       rowOffset *int64,
+       task FileScanTask,
+       batch arrow.RecordBatch,
+       _ *iceberg.Schema,
+       _ bool,
+) (arrow.RecordBatch, error) {
+       alloc := compute.GetAllocator(ctx)
+       schema := batch.Schema()
+       ncols := int(batch.NumCols())
+       nrows := batch.NumRows()
+       newCols := make([]arrow.Array, ncols)
+
+       // Resolve column indices by name; -1 if not present.
+       rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+       seqNumIndices := 
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+       rowIDColIdx := -1
+       if len(rowIDIndices) > 0 {
+               rowIDColIdx = rowIDIndices[0]
+       }
+       seqNumColIdx := -1
+       if len(seqNumIndices) > 0 {
+               seqNumColIdx = seqNumIndices[0]
+       }
+
+       for i := 0; i < ncols; i++ {
+               if i == rowIDColIdx && task.FirstRowID != nil {
+                       // _row_id: inherit first_row_id + row_position when 
null; else keep value from file.
+                       col := batch.Column(i).(*array.Int64)

Review Comment:
   what if this is not int64 cols? this would panic, i think worth to add a 
check.



##########
manifest.go:
##########
@@ -759,6 +771,19 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, 
error) {
                tmp = tmp.(*fallbackManifestEntry).toEntry()
        }
        tmp.inherit(c.file)
+       // Apply first_row_id inheritance for v3 data manifests (spec: First 
Row ID Inheritance).
+       if c.content == ManifestContentData && c.file.FirstRowId() != nil {

Review Comment:
   The Java idAssigner always advances nextRowId += file.recordCount() for 
every file — null or explicit. The Go implementation only advances when 
FirstRowIDField == nil, so any explicit-first_row_id file in the manifest 
silently resets the baseline for subsequent null files, producing overlapping 
row ID ranges.
   The fix and the cleanup land together: initialize nextFirstRowID eagerly in 
NewManifestReader (eliminating the *int64 sentinel entirely), then 
unconditionally advance after the conditional assign:
   
   ```go
     if r.inheritRowIDs {
         if df, ok := tmp.DataFile().(*dataFile); ok {
             if df.FirstRowIDField == nil {
                 id := r.nextFirstRowID
                 df.FirstRowIDField = &id
             }
             r.nextFirstRowID += df.Count() // always advance, matches Java 
semantics
         }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to