laskoviymishka commented on code in PR #735:
URL: https://github.com/apache/iceberg-go/pull/735#discussion_r2878121065
##########
manifest.go:
##########
@@ -1445,32 +1470,40 @@ func (m *ManifestListWriter) AddManifests(files
[]ManifestFile) error {
}
// WriteManifestList writes a list of manifest files to an avro file.
-func WriteManifestList(version int, out io.Writer, snapshotID int64,
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files
[]ManifestFile) (err error) {
+// For v3 tables, it returns the next row ID after assigning first_row_id to
data manifests; otherwise 0.
+func WriteManifestList(version int, out io.Writer, snapshotID int64,
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files
[]ManifestFile) (nextRowIDAfter int64, err error) {
Review Comment:
this is public function, i would prefer to keep this function as is and make
separate WriteManifestV3, which returns more args
##########
manifest.go:
##########
@@ -520,6 +524,9 @@ type ManifestFile interface {
// field in the spec. Each field in the list corresponds to a field in
// the manifest file's partition spec.
Partitions() []FieldSummary
+ // FirstRowId returns the first _row_id assigned to rows in this
manifest (v3+ data manifests only).
+ // Returns nil for v1/v2 or for delete manifests.
+ FirstRowId() *int64
Review Comment:
FirstRowId --> FirstRowID
since this is a public API changing this would be very hard thing, and we
have already other IDs as ID, not Id.
##########
table/scanner.go:
##########
@@ -466,12 +466,21 @@ func (scan *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
if err != nil {
return nil, err
}
- results = append(results, FileScanTask{
+ task := FileScanTask{
File: e.DataFile(),
DeleteFiles: deleteFiles,
Start: 0,
Length: e.DataFile().FileSizeBytes(),
- })
+ }
+ // Row lineage constants for v3: readers use these to
synthesize _row_id and _last_updated_sequence_number.
+ if scan.metadata.Version() >= 3 {
+ task.FirstRowID = e.DataFile().FirstRowID()
+ seq := e.SequenceNum()
Review Comment:
i think this should be FileSequenceNum, not SequenceNum
smth like:
```go
if fseq := e.FileSequenceNum(); fseq != nil {
task.DataSequenceNumber = fseq
}
```
##########
table/arrow_scanner.go:
##########
@@ -340,6 +340,80 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+ _ *iceberg.Schema,
Review Comment:
why does this needed?
##########
manifest.go:
##########
@@ -595,6 +602,11 @@ type ManifestReader struct {
schemaLoaded bool
partitionSpec PartitionSpec
partitionSpecLoaded bool
+
+ // nextFirstRowID tracks the next first_row_id to assign when reading
v3 data
+ // manifests; used for First Row ID inheritance (null data file
first_row_id
+ // gets manifest's first_row_id + sum of preceding null-first_row_id
files' record_count).
+ nextFirstRowID *int64
Review Comment:
i think this *int64 optional pattern make whole code a much more cumbersome,
can we fallback to either just int64, or a define pair of int64 + bool?
The *int64 pattern does make sense in the rest of the codebase for
Avro-serialized fields that need to round-trip as nullable (e.g.
FirstRowIDField *int64 on dataFile). But for a private tracking variable inside
ManifestReader, it's the wrong tool, you want a clear "am I even doing this?"
flag, not pointer indirection.
##########
table/arrow_scanner.go:
##########
@@ -340,6 +340,80 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+ _ *iceberg.Schema,
+ _ bool,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ ncols := int(batch.NumCols())
+ nrows := batch.NumRows()
+ newCols := make([]arrow.Array, ncols)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ for i := 0; i < ncols; i++ {
+ if i == rowIDColIdx && task.FirstRowID != nil {
+ // _row_id: inherit first_row_id + row_position when
null; else keep value from file.
+ col := batch.Column(i).(*array.Int64)
Review Comment:
what if this is not int64 cols? this would panic, i think worth to add a
check.
##########
manifest.go:
##########
@@ -759,6 +771,19 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry,
error) {
tmp = tmp.(*fallbackManifestEntry).toEntry()
}
tmp.inherit(c.file)
+ // Apply first_row_id inheritance for v3 data manifests (spec: First
Row ID Inheritance).
+ if c.content == ManifestContentData && c.file.FirstRowId() != nil {
Review Comment:
The Java idAssigner always advances nextRowId += file.recordCount() for
every file — null or explicit. The Go implementation only advances when
FirstRowIDField == nil, so any explicit-first_row_id file in the manifest
silently resets the baseline for subsequent null files, producing overlapping
row ID ranges.
The fix and the cleanup land together: initialize nextFirstRowID eagerly in
NewManifestReader (eliminating the *int64 sentinel entirely), then
unconditionally advance after the conditional assign:
```go
if r.inheritRowIDs {
if df, ok := tmp.DataFile().(*dataFile); ok {
if df.FirstRowIDField == nil {
id := r.nextFirstRowID
df.FirstRowIDField = &id
}
r.nextFirstRowID += df.Count() // always advance, matches Java
semantics
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]