Elbehery commented on code in PR #497:
URL: https://github.com/apache/iceberg-go/pull/497#discussion_r2231666673


##########
table/table_test.go:
##########
@@ -1243,8 +2296,8 @@ func TestNullableStructRequiredField(t *testing.T) {
        sc, err := table.ArrowSchemaToIcebergWithFreshIDs(arrowSchema, false)
        require.NoError(t, err)
 
-       require.NoError(t, cat.CreateNamespace(t.Context(), 
table.Identifier{"testing"}, nil))
-       tbl, err := cat.CreateTable(t.Context(), table.Identifier{"testing", 
"nullable_struct_required_field"}, sc,
+       require.NoError(t, cat.CreateNamespace(context.Background(), 
table.Identifier{"testing"}, nil))

Review Comment:
   done 👍🏽 



##########
table/table_test.go:
##########
@@ -1202,6 +1202,1059 @@ func TestTableWriting(t *testing.T) {
        suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
 }
 
+// WriteOperationsTestSuite provides comprehensive testing for advanced write 
operations
+type WriteOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestWriteOperations(t *testing.T) {
+       suite.Run(t, new(WriteOperationsTestSuite))
+}
+
+func (s *WriteOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "ts", Type: 
iceberg.PrimitiveTypes.Timestamp})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "ts", Type: &arrow.TimestampType{Unit: 
arrow.Microsecond}, Nullable: true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "ts": 1672531200000000},
+                       {"id": 2, "data": "bar", "ts": 1672534800000000},
+                       {"id": 3, "data": "baz", "ts": 1672538400000000}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *WriteOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *WriteOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *WriteOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *WriteOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *WriteOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec) *table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, iceberg.Properties{"format-version": 
strconv.Itoa(formatVersion)})
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *WriteOperationsTestSuite) createTableWithData(identifier 
table.Identifier, numFiles int) (*table.Table, []string) {
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+       }
+
+       // Add files to table
+       tx := tbl.NewTransaction()
+       s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false))
+
+       committedTbl, err := tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       return committedTbl, files
+}
+
+func (s *WriteOperationsTestSuite) getFS(tbl *table.Table) iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+// 
=============================================================================
+// SNAPSHOT VALIDATION HELPERS
+// 
=============================================================================
+
+// validateSnapshotFiles checks that the snapshot contains exactly the 
expected files
+func (s *WriteOperationsTestSuite) validateSnapshotFiles(snapshot 
*table.Snapshot, fs iceio.IO, expectedFiles []string) {
+       s.Require().NotNil(snapshot, "Snapshot should not be nil")
+
+       // Get actual files from snapshot using public API
+       manifests, err := snapshot.Manifests(fs)
+       s.Require().NoError(err, "Failed to read manifests from snapshot")
+
+       var actualFiles []string
+       for _, manifest := range manifests {
+               entries, err := manifest.FetchEntries(fs, false)
+               s.Require().NoError(err, "Failed to fetch entries from 
manifest")
+
+               for _, entry := range entries {
+                       // Only include data files (not delete files)
+                       if entry.DataFile().ContentType() == 
iceberg.EntryContentData {
+                               actualFiles = append(actualFiles, 
entry.DataFile().FilePath())
+                       }
+               }
+       }
+
+       // Sort for comparison
+       expectedSorted := make([]string, len(expectedFiles))
+       copy(expectedSorted, expectedFiles)
+       slices.Sort(expectedSorted)
+       slices.Sort(actualFiles)
+
+       s.Equal(len(expectedSorted), len(actualFiles), "File count mismatch - 
expected %d files, got %d", len(expectedSorted), len(actualFiles))
+       s.Equal(expectedSorted, actualFiles, "File paths don't match 
expected.\nExpected: %v\nActual: %v", expectedSorted, actualFiles)
+
+       s.T().Logf("Snapshot file validation passed: %d files match expected", 
len(actualFiles))
+}
+
+// validateSnapshotSummary checks operation type and summary properties
+func (s *WriteOperationsTestSuite) validateSnapshotSummary(snapshot 
*table.Snapshot, expectedOp table.Operation, expectedCounts map[string]string) {
+       s.Require().NotNil(snapshot, "Snapshot should not be nil")
+       s.Require().NotNil(snapshot.Summary, "Snapshot summary should not be 
nil")
+
+       s.Equal(expectedOp, snapshot.Summary.Operation, "Snapshot operation 
mismatch")
+
+       if expectedCounts != nil {
+               for key, expectedValue := range expectedCounts {
+                       actualValue, exists := snapshot.Summary.Properties[key]
+                       s.True(exists, "Summary property %s should exist", key)
+                       s.Equal(expectedValue, actualValue, "Summary property 
%s mismatch - expected %s, got %s", key, expectedValue, actualValue)
+               }
+       }
+
+       s.T().Logf("Snapshot summary validation passed: operation=%s, 
properties=%d", expectedOp, len(expectedCounts))
+}
+
+// validateManifestStructure validates manifest files and returns total entry 
count
+func (s *WriteOperationsTestSuite) validateManifestStructure(snapshot 
*table.Snapshot, fs iceio.IO) int {
+       s.Require().NotNil(snapshot, "Snapshot should not be nil")
+
+       manifests, err := snapshot.Manifests(fs)
+       s.Require().NoError(err, "Failed to read manifests from snapshot")
+       s.Greater(len(manifests), 0, "Should have at least one manifest")
+
+       totalEntries := 0
+       for i, manifest := range manifests {
+               // Validate manifest is readable
+               entries, err := manifest.FetchEntries(fs, false)
+               s.Require().NoError(err, "Failed to fetch entries from manifest 
%d", i)
+               totalEntries += len(entries)
+
+               // Validate manifest metadata
+               s.Greater(manifest.Length(), int64(0), "Manifest %d should have 
positive length", i)
+               s.NotEmpty(manifest.FilePath(), "Manifest %d should have valid 
path", i)
+
+               s.T().Logf("📄 Manifest %d: %s (%d entries, %d bytes)", i, 
manifest.FilePath(), len(entries), manifest.Length())
+       }
+
+       s.T().Logf("Manifest structure validation passed: %d manifests, %d 
total entries", len(manifests), totalEntries)
+
+       return totalEntries
+}
+
+// validateSnapshotState performs comprehensive validation of snapshot state
+func (s *WriteOperationsTestSuite) validateSnapshotState(snapshot 
*table.Snapshot, fs iceio.IO, expectedFiles []string, expectedOp 
table.Operation, expectedCounts map[string]string) {
+       s.T().Logf("🔍 Validating snapshot state (ID: %d)", snapshot.SnapshotID)
+
+       // Validate all components
+       s.validateSnapshotFiles(snapshot, fs, expectedFiles)
+       s.validateSnapshotSummary(snapshot, expectedOp, expectedCounts)
+       entryCount := s.validateManifestStructure(snapshot, fs)
+
+       // Ensure manifest entries match file count
+       s.Equal(len(expectedFiles), entryCount, "Manifest entry count should 
match expected file count")
+
+       s.T().Logf("🎉 Complete snapshot validation passed for snapshot %d", 
snapshot.SnapshotID)
+}
+
+// validateDataIntegrity scans the table and validates row count and basic data
+func (s *WriteOperationsTestSuite) validateDataIntegrity(tbl *table.Table, 
expectedRowCount int64) {
+       scan := tbl.Scan()
+       results, err := scan.ToArrowTable(s.ctx)
+       s.Require().NoError(err, "Failed to scan table for data integrity 
check")
+       defer results.Release()
+
+       s.Equal(expectedRowCount, results.NumRows(), "Row count mismatch - 
expected %d, got %d", expectedRowCount, results.NumRows())
+
+       // Basic data validation - ensure we can read the data
+       s.Equal(3, int(results.NumCols()), "Should have 3 columns (id, data, 
ts)")
+
+       s.T().Logf("Data integrity validation passed: %d rows, %d columns", 
results.NumRows(), results.NumCols())
+}
+
+// getSnapshotFiles extracts the list of data file paths from a snapshot for 
comparison
+func (s *WriteOperationsTestSuite) getSnapshotFiles(snapshot *table.Snapshot, 
fs iceio.IO) []string {
+       if snapshot == nil {
+               return []string{}
+       }
+
+       manifests, err := snapshot.Manifests(fs)
+       s.Require().NoError(err, "Failed to read manifests from snapshot")
+
+       var files []string
+       for _, manifest := range manifests {
+               entries, err := manifest.FetchEntries(fs, false)
+               s.Require().NoError(err, "Failed to fetch entries from 
manifest")
+
+               for _, entry := range entries {
+                       // Only include data files (not delete files)
+                       if entry.DataFile().ContentType() == 
iceberg.EntryContentData {
+                               files = append(files, 
entry.DataFile().FilePath())
+                       }
+               }
+       }
+
+       // Sort for consistent comparison
+       slices.Sort(files)
+
+       return files
+}
+
+// 
=============================================================================
+// TESTS WITH ENHANCED VALIDATION
+// 
=============================================================================
+
+func TestRewriteFiles(t *testing.T) {
+       suite.Run(t, &WriteOperationsTestSuite{})
+}
+
+func (s *WriteOperationsTestSuite) TestRewriteFiles() {
+       s.Run("RewriteDataFiles", func() {
+               // Setup table with multiple small files
+               ident := table.Identifier{"default", "rewrite_test_table"}
+               tbl, originalFiles := s.createTableWithData(ident, 3)
+
+               // VALIDATE INITIAL STATE
+               initialSnapshot := tbl.CurrentSnapshot()
+               s.validateSnapshotState(initialSnapshot, s.getFS(tbl), 
originalFiles, table.OpAppend, map[string]string{
+                       "added-data-files": "3",
+                       "added-records":    "9", // 3 files × 3 rows each
+               })
+               s.validateDataIntegrity(tbl, 9) // 3 files × 3 rows each
+
+               // Capture initial file list for comparison
+               initialFiles := s.getSnapshotFiles(initialSnapshot, 
s.getFS(tbl))
+
+               // Create new consolidated file
+               consolidatedPath := s.location + "/data/consolidated.parquet"
+
+               // Create larger dataset for consolidation
+               mem := memory.DefaultAllocator
+               largerTable, err := array.TableFromJSON(mem, s.arrSchema, 
[]string{
+                       `[
+                               {"id": 1, "data": "foo", "ts": 
1672531200000000},
+                               {"id": 2, "data": "bar", "ts": 
1672534800000000},
+                               {"id": 3, "data": "baz", "ts": 
1672538400000000},
+                               {"id": 4, "data": "qux", "ts": 
1672542000000000},
+                               {"id": 5, "data": "quux", "ts": 
1672545600000000}
+                       ]`,
+               })
+               s.Require().NoError(err)
+               defer largerTable.Release()
+
+               fs := s.getFS(tbl)
+               s.writeParquet(fs, consolidatedPath, largerTable)
+
+               // Rewrite files (replace multiple small files with one larger 
file)
+               tx := tbl.NewTransaction()
+               err = tx.ReplaceDataFiles(s.ctx, originalFiles, 
[]string{consolidatedPath}, nil)
+               s.Require().NoError(err)
+
+               newTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // VALIDATE FINAL STATE WITH ENHANCED CHECKS
+               finalSnapshot := newTbl.CurrentSnapshot()
+
+               // Assert that file lists differ before and after the operation
+               finalFiles := s.getSnapshotFiles(finalSnapshot, s.getFS(newTbl))
+               s.NotEqual(initialFiles, finalFiles, "File lists should differ 
before and after rewrite operation")
+               s.Greater(len(finalFiles), len(initialFiles), "Rewrite 
operation should result in more files (current behavior)")
+
+               // NOTE: Current ReplaceDataFiles implementation keeps both old 
and new files
+               // In a full implementation, it should only contain the 
consolidated file
+               var allFiles []string
+               manifests, err := finalSnapshot.Manifests(s.getFS(newTbl))
+               s.Require().NoError(err)
+               for _, manifest := range manifests {
+                       entries, err := manifest.FetchEntries(s.getFS(newTbl), 
false)
+                       s.Require().NoError(err)
+                       for _, entry := range entries {
+                               if entry.DataFile().ContentType() == 
iceberg.EntryContentData {
+                                       allFiles = append(allFiles, 
entry.DataFile().FilePath())
+                               }
+                       }
+               }
+
+               // Current behavior: keeps both original and consolidated files
+               expectedFiles := append(originalFiles, consolidatedPath)
+               s.validateSnapshotState(finalSnapshot, s.getFS(newTbl), 
expectedFiles, table.OpOverwrite, map[string]string{
+                       "added-data-files": "1",
+                       "added-records":    "5",
+               })
+
+               // Total data should be correct regardless of file handling
+               s.validateDataIntegrity(newTbl, 5) // consolidated data
+
+               // Verify snapshot progression
+               s.NotEqual(initialSnapshot.SnapshotID, 
finalSnapshot.SnapshotID, "Should create new snapshot")
+               s.Equal(&initialSnapshot.SnapshotID, 
finalSnapshot.ParentSnapshotID, "Should reference previous snapshot as parent")
+
+               s.T().Log("NOTE: ReplaceDataFiles currently keeps both old and 
new files")
+               s.T().Log("EXPECTED: In full implementation, should only 
contain consolidated file")
+       })
+
+       s.Run("RewriteWithConflictDetection", func() {
+               // Test concurrent rewrite operations
+               ident := table.Identifier{"default", "rewrite_conflict_test"}
+               tbl, originalFiles := s.createTableWithData(ident, 2)
+
+               // VALIDATE INITIAL STATE
+               initialSnapshot := tbl.CurrentSnapshot()
+               s.validateSnapshotState(initialSnapshot, s.getFS(tbl), 
originalFiles, table.OpAppend, map[string]string{
+                       "added-data-files": "2",
+                       "added-records":    "6", // 2 files × 3 rows each
+               })
+
+               // Start first transaction
+               tx1 := tbl.NewTransaction()
+               consolidatedPath1 := s.location + "/data/consolidated1.parquet"
+               fs := s.getFS(tbl)
+               s.writeParquet(fs, consolidatedPath1, s.arrTable)
+
+               // Start second transaction
+               tx2 := tbl.NewTransaction()
+               consolidatedPath2 := s.location + "/data/consolidated2.parquet"
+               s.writeParquet(fs, consolidatedPath2, s.arrTable)
+
+               // Both try to replace the same files
+               err1 := tx1.ReplaceDataFiles(s.ctx, originalFiles, 
[]string{consolidatedPath1}, nil)
+               s.Require().NoError(err1)
+
+               err2 := tx2.ReplaceDataFiles(s.ctx, originalFiles, 
[]string{consolidatedPath2}, nil)
+               s.Require().NoError(err2)
+
+               // First should succeed
+               firstTbl, err1 := tx1.Commit(s.ctx)
+               s.Require().NoError(err1)
+
+               // VALIDATE FIRST TRANSACTION STATE
+               firstSnapshot := firstTbl.CurrentSnapshot()
+
+               // Current behavior: keeps both original and new files
+               expectedFirstFiles := append(originalFiles, consolidatedPath1)
+               s.validateSnapshotState(firstSnapshot, s.getFS(firstTbl), 
expectedFirstFiles, table.OpOverwrite, map[string]string{
+                       "added-data-files": "1",
+               })
+
+               // Second should succeed since conflict detection may not be 
fully implemented yet
+               // In a full implementation, this would fail due to conflict
+               _, err2 = tx2.Commit(s.ctx)
+               if err2 != nil {
+                       s.T().Logf("Transaction conflict detected: %v", err2)
+               } else {
+                       s.T().Log("Transaction completed without conflict - 
conflict detection may not be fully implemented")
+               }
+
+               s.T().Log("NOTE: ReplaceDataFiles currently keeps both old and 
new files")
+               s.T().Log("EXPECTED: In full implementation, should replace 
files completely")
+       })
+}
+
+func (s *WriteOperationsTestSuite) TestOverwriteFiles() {
+       s.Run("OverwriteByPartition", func() {

Review Comment:
   done 👍🏽 



##########
table/table_test.go:
##########
@@ -1202,6 +1202,1059 @@ func TestTableWriting(t *testing.T) {
        suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
 }
 
+// WriteOperationsTestSuite provides comprehensive testing for advanced write 
operations
+type WriteOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestWriteOperations(t *testing.T) {
+       suite.Run(t, new(WriteOperationsTestSuite))
+}
+
+func (s *WriteOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "ts", Type: 
iceberg.PrimitiveTypes.Timestamp})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "ts", Type: &arrow.TimestampType{Unit: 
arrow.Microsecond}, Nullable: true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "ts": 1672531200000000},
+                       {"id": 2, "data": "bar", "ts": 1672534800000000},
+                       {"id": 3, "data": "baz", "ts": 1672538400000000}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *WriteOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *WriteOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *WriteOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *WriteOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *WriteOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec) *table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, iceberg.Properties{"format-version": 
strconv.Itoa(formatVersion)})
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *WriteOperationsTestSuite) createTableWithData(identifier 
table.Identifier, numFiles int) (*table.Table, []string) {
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+       }
+
+       // Add files to table
+       tx := tbl.NewTransaction()
+       s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false))
+
+       committedTbl, err := tx.Commit(s.ctx)
+       s.Require().NoError(err)
+
+       return committedTbl, files
+}
+
+func (s *WriteOperationsTestSuite) getFS(tbl *table.Table) iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+// 
=============================================================================
+// SNAPSHOT VALIDATION HELPERS
+// 
=============================================================================
+
+// validateSnapshotFiles checks that the snapshot contains exactly the 
expected files
+func (s *WriteOperationsTestSuite) validateSnapshotFiles(snapshot 
*table.Snapshot, fs iceio.IO, expectedFiles []string) {
+       s.Require().NotNil(snapshot, "Snapshot should not be nil")
+
+       // Get actual files from snapshot using public API
+       manifests, err := snapshot.Manifests(fs)
+       s.Require().NoError(err, "Failed to read manifests from snapshot")
+
+       var actualFiles []string
+       for _, manifest := range manifests {
+               entries, err := manifest.FetchEntries(fs, false)
+               s.Require().NoError(err, "Failed to fetch entries from 
manifest")
+
+               for _, entry := range entries {
+                       // Only include data files (not delete files)
+                       if entry.DataFile().ContentType() == 
iceberg.EntryContentData {
+                               actualFiles = append(actualFiles, 
entry.DataFile().FilePath())
+                       }
+               }
+       }
+
+       // Sort for comparison
+       expectedSorted := make([]string, len(expectedFiles))
+       copy(expectedSorted, expectedFiles)
+       slices.Sort(expectedSorted)
+       slices.Sort(actualFiles)
+
+       s.Equal(len(expectedSorted), len(actualFiles), "File count mismatch - 
expected %d files, got %d", len(expectedSorted), len(actualFiles))
+       s.Equal(expectedSorted, actualFiles, "File paths don't match 
expected.\nExpected: %v\nActual: %v", expectedSorted, actualFiles)
+
+       s.T().Logf("Snapshot file validation passed: %d files match expected", 
len(actualFiles))
+}
+
+// validateSnapshotSummary checks operation type and summary properties
+func (s *WriteOperationsTestSuite) validateSnapshotSummary(snapshot 
*table.Snapshot, expectedOp table.Operation, expectedCounts map[string]string) {
+       s.Require().NotNil(snapshot, "Snapshot should not be nil")
+       s.Require().NotNil(snapshot.Summary, "Snapshot summary should not be 
nil")
+
+       s.Equal(expectedOp, snapshot.Summary.Operation, "Snapshot operation 
mismatch")
+
+       if expectedCounts != nil {
+               for key, expectedValue := range expectedCounts {
+                       actualValue, exists := snapshot.Summary.Properties[key]
+                       s.True(exists, "Summary property %s should exist", key)
+                       s.Equal(expectedValue, actualValue, "Summary property 
%s mismatch - expected %s, got %s", key, expectedValue, actualValue)
+               }
+       }
+
+       s.T().Logf("Snapshot summary validation passed: operation=%s, 
properties=%d", expectedOp, len(expectedCounts))
+}
+
+// validateManifestStructure validates manifest files and returns total entry 
count
+func (s *WriteOperationsTestSuite) validateManifestStructure(snapshot 
*table.Snapshot, fs iceio.IO) int {
+       s.Require().NotNil(snapshot, "Snapshot should not be nil")
+
+       manifests, err := snapshot.Manifests(fs)
+       s.Require().NoError(err, "Failed to read manifests from snapshot")
+       s.Greater(len(manifests), 0, "Should have at least one manifest")
+
+       totalEntries := 0
+       for i, manifest := range manifests {
+               // Validate manifest is readable
+               entries, err := manifest.FetchEntries(fs, false)
+               s.Require().NoError(err, "Failed to fetch entries from manifest 
%d", i)
+               totalEntries += len(entries)
+
+               // Validate manifest metadata
+               s.Greater(manifest.Length(), int64(0), "Manifest %d should have 
positive length", i)
+               s.NotEmpty(manifest.FilePath(), "Manifest %d should have valid 
path", i)
+
+               s.T().Logf("📄 Manifest %d: %s (%d entries, %d bytes)", i, 
manifest.FilePath(), len(entries), manifest.Length())
+       }
+
+       s.T().Logf("Manifest structure validation passed: %d manifests, %d 
total entries", len(manifests), totalEntries)
+
+       return totalEntries
+}
+
+// validateSnapshotState performs comprehensive validation of snapshot state
+func (s *WriteOperationsTestSuite) validateSnapshotState(snapshot 
*table.Snapshot, fs iceio.IO, expectedFiles []string, expectedOp 
table.Operation, expectedCounts map[string]string) {
+       s.T().Logf("🔍 Validating snapshot state (ID: %d)", snapshot.SnapshotID)
+
+       // Validate all components
+       s.validateSnapshotFiles(snapshot, fs, expectedFiles)
+       s.validateSnapshotSummary(snapshot, expectedOp, expectedCounts)
+       entryCount := s.validateManifestStructure(snapshot, fs)
+
+       // Ensure manifest entries match file count
+       s.Equal(len(expectedFiles), entryCount, "Manifest entry count should 
match expected file count")
+
+       s.T().Logf("🎉 Complete snapshot validation passed for snapshot %d", 
snapshot.SnapshotID)
+}
+
+// validateDataIntegrity scans the table and validates row count and basic data
+func (s *WriteOperationsTestSuite) validateDataIntegrity(tbl *table.Table, 
expectedRowCount int64) {
+       scan := tbl.Scan()
+       results, err := scan.ToArrowTable(s.ctx)
+       s.Require().NoError(err, "Failed to scan table for data integrity 
check")
+       defer results.Release()
+
+       s.Equal(expectedRowCount, results.NumRows(), "Row count mismatch - 
expected %d, got %d", expectedRowCount, results.NumRows())
+
+       // Basic data validation - ensure we can read the data
+       s.Equal(3, int(results.NumCols()), "Should have 3 columns (id, data, 
ts)")
+
+       s.T().Logf("Data integrity validation passed: %d rows, %d columns", 
results.NumRows(), results.NumCols())
+}
+
+// getSnapshotFiles extracts the list of data file paths from a snapshot for 
comparison
+func (s *WriteOperationsTestSuite) getSnapshotFiles(snapshot *table.Snapshot, 
fs iceio.IO) []string {
+       if snapshot == nil {
+               return []string{}
+       }
+
+       manifests, err := snapshot.Manifests(fs)
+       s.Require().NoError(err, "Failed to read manifests from snapshot")
+
+       var files []string
+       for _, manifest := range manifests {
+               entries, err := manifest.FetchEntries(fs, false)
+               s.Require().NoError(err, "Failed to fetch entries from 
manifest")
+
+               for _, entry := range entries {
+                       // Only include data files (not delete files)
+                       if entry.DataFile().ContentType() == 
iceberg.EntryContentData {
+                               files = append(files, 
entry.DataFile().FilePath())
+                       }
+               }
+       }
+
+       // Sort for consistent comparison
+       slices.Sort(files)
+
+       return files
+}
+
+// 
=============================================================================
+// TESTS WITH ENHANCED VALIDATION
+// 
=============================================================================
+
+func TestRewriteFiles(t *testing.T) {
+       suite.Run(t, &WriteOperationsTestSuite{})
+}
+
+func (s *WriteOperationsTestSuite) TestRewriteFiles() {
+       s.Run("RewriteDataFiles", func() {
+               // Setup table with multiple small files
+               ident := table.Identifier{"default", "rewrite_test_table"}
+               tbl, originalFiles := s.createTableWithData(ident, 3)
+
+               // VALIDATE INITIAL STATE
+               initialSnapshot := tbl.CurrentSnapshot()
+               s.validateSnapshotState(initialSnapshot, s.getFS(tbl), 
originalFiles, table.OpAppend, map[string]string{
+                       "added-data-files": "3",
+                       "added-records":    "9", // 3 files × 3 rows each
+               })
+               s.validateDataIntegrity(tbl, 9) // 3 files × 3 rows each
+
+               // Capture initial file list for comparison
+               initialFiles := s.getSnapshotFiles(initialSnapshot, 
s.getFS(tbl))
+
+               // Create new consolidated file
+               consolidatedPath := s.location + "/data/consolidated.parquet"
+
+               // Create larger dataset for consolidation
+               mem := memory.DefaultAllocator
+               largerTable, err := array.TableFromJSON(mem, s.arrSchema, 
[]string{
+                       `[
+                               {"id": 1, "data": "foo", "ts": 
1672531200000000},
+                               {"id": 2, "data": "bar", "ts": 
1672534800000000},
+                               {"id": 3, "data": "baz", "ts": 
1672538400000000},
+                               {"id": 4, "data": "qux", "ts": 
1672542000000000},
+                               {"id": 5, "data": "quux", "ts": 
1672545600000000}
+                       ]`,
+               })
+               s.Require().NoError(err)
+               defer largerTable.Release()
+
+               fs := s.getFS(tbl)
+               s.writeParquet(fs, consolidatedPath, largerTable)
+
+               // Rewrite files (replace multiple small files with one larger 
file)
+               tx := tbl.NewTransaction()
+               err = tx.ReplaceDataFiles(s.ctx, originalFiles, 
[]string{consolidatedPath}, nil)
+               s.Require().NoError(err)
+
+               newTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // VALIDATE FINAL STATE WITH ENHANCED CHECKS
+               finalSnapshot := newTbl.CurrentSnapshot()
+
+               // Assert that file lists differ before and after the operation
+               finalFiles := s.getSnapshotFiles(finalSnapshot, s.getFS(newTbl))
+               s.NotEqual(initialFiles, finalFiles, "File lists should differ 
before and after rewrite operation")
+               s.Greater(len(finalFiles), len(initialFiles), "Rewrite 
operation should result in more files (current behavior)")
+
+               // NOTE: Current ReplaceDataFiles implementation keeps both old 
and new files
+               // In a full implementation, it should only contain the 
consolidated file
+               var allFiles []string
+               manifests, err := finalSnapshot.Manifests(s.getFS(newTbl))
+               s.Require().NoError(err)
+               for _, manifest := range manifests {
+                       entries, err := manifest.FetchEntries(s.getFS(newTbl), 
false)
+                       s.Require().NoError(err)
+                       for _, entry := range entries {
+                               if entry.DataFile().ContentType() == 
iceberg.EntryContentData {
+                                       allFiles = append(allFiles, 
entry.DataFile().FilePath())
+                               }
+                       }
+               }
+
+               // Current behavior: keeps both original and consolidated files
+               expectedFiles := append(originalFiles, consolidatedPath)
+               s.validateSnapshotState(finalSnapshot, s.getFS(newTbl), 
expectedFiles, table.OpOverwrite, map[string]string{
+                       "added-data-files": "1",
+                       "added-records":    "5",
+               })
+
+               // Total data should be correct regardless of file handling
+               s.validateDataIntegrity(newTbl, 5) // consolidated data
+
+               // Verify snapshot progression
+               s.NotEqual(initialSnapshot.SnapshotID, 
finalSnapshot.SnapshotID, "Should create new snapshot")
+               s.Equal(&initialSnapshot.SnapshotID, 
finalSnapshot.ParentSnapshotID, "Should reference previous snapshot as parent")
+
+               s.T().Log("NOTE: ReplaceDataFiles currently keeps both old and 
new files")
+               s.T().Log("EXPECTED: In full implementation, should only 
contain consolidated file")
+       })
+
+       s.Run("RewriteWithConflictDetection", func() {

Review Comment:
   done 👍🏽 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to