kevinjqliu commented on code in PR #330:
URL: https://github.com/apache/iceberg-go/pull/330#discussion_r1990183802


##########
table/table_test.go:
##########
@@ -128,3 +138,235 @@ func (t *TableTestSuite) TestSnapshotByName() {
 
        t.True(testSnapshot.Equals(*t.tbl.SnapshotByName("test")))
 }
+
+type TableWritingTestSuite struct {
+       suite.Suite
+
+       tableSchema      *iceberg.Schema
+       arrSchema        *arrow.Schema
+       arrTbl           arrow.Table
+       arrSchemaWithIDs *arrow.Schema
+       arrTblWithIDs    arrow.Table
+       arrSchemaUpdated *arrow.Schema
+       arrTblUpdated    arrow.Table
+
+       location      string
+       formatVersion int
+}
+
+func (t *TableWritingTestSuite) SetupSuite() {
+       mem := memory.DefaultAllocator
+
+       t.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.Bool},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 4, Name: "baz", Type: 
iceberg.PrimitiveTypes.Int32},
+               iceberg.NestedField{ID: 10, Name: "qux", Type: 
iceberg.PrimitiveTypes.Date})
+
+       t.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "foo", Type: arrow.FixedWidthTypes.Boolean, Nullable: 
true},
+               {Name: "bar", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "baz", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+               {Name: "qux", Type: arrow.PrimitiveTypes.Date32, Nullable: 
true},
+       }, nil)
+
+       var err error
+       t.arrTbl, err = array.TableFromJSON(mem, t.arrSchema, []string{
+               `[{"foo": true, "bar": "bar_string", "baz": 123, "qux": 
"2024-03-07"}]`,
+       })
+       t.Require().NoError(err)
+
+       t.arrSchemaWithIDs = arrow.NewSchema([]arrow.Field{
+               {
+                       Name: "foo", Type: arrow.FixedWidthTypes.Boolean,
+                       Metadata: 
arrow.MetadataFrom(map[string]string{"PARQUET:field_id": "1"}),
+               },
+               {
+                       Name: "bar", Type: arrow.BinaryTypes.String,
+                       Metadata: 
arrow.MetadataFrom(map[string]string{"PARQUET:field_id": "2"}),
+               },
+               {
+                       Name: "baz", Type: arrow.PrimitiveTypes.Int32,
+                       Metadata: 
arrow.MetadataFrom(map[string]string{"PARQUET:field_id": "3"}),
+               },
+               {
+                       Name: "qux", Type: arrow.PrimitiveTypes.Date32,
+                       Metadata: 
arrow.MetadataFrom(map[string]string{"PARQUET:field_id": "4"}),
+               },
+       }, nil)
+
+       t.arrTblWithIDs, err = array.TableFromJSON(mem, t.arrSchemaWithIDs, 
[]string{
+               `[{"foo": true, "bar": "bar_string", "baz": 123, "qux": 
"2024-03-07"}]`,
+       })
+       t.Require().NoError(err)
+
+       t.arrSchemaUpdated = arrow.NewSchema([]arrow.Field{
+               {Name: "foo", Type: arrow.FixedWidthTypes.Boolean, Nullable: 
true},
+               {Name: "baz", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+               {Name: "qux", Type: arrow.PrimitiveTypes.Date32, Nullable: 
true},
+               {Name: "quux", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       t.arrTblUpdated, err = array.TableFromJSON(mem, t.arrSchemaUpdated, 
[]string{
+               `[{"foo": true, "baz": 123, "qux": "2024-03-07", "quux": 234}]`,
+       })
+       t.Require().NoError(err)
+}
+
+func (t *TableWritingTestSuite) SetupTest() {
+       t.location = t.T().TempDir()
+}
+
+func (t *TableWritingTestSuite) TearDownSuite() {
+       t.arrTbl.Release()
+       t.arrTblUpdated.Release()
+       t.arrTblWithIDs.Release()
+}
+
+func (t *TableWritingTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               t.location, 1, uuid.New().String())
+}
+
+func (t *TableWritingTestSuite) writeParquet(fio iceio.WriteFileIO, filePath 
string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       t.Require().NoError(err)
+
+       t.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (t *TableWritingTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec, sc *iceberg.Schema) *table.Table 
{
+       meta, err := table.NewMetadata(sc, &spec, table.UnsortedSortOrder,
+               t.getMetadataLoc(), iceberg.Properties{"format-version": 
strconv.Itoa(formatVersion)})
+       t.Require().NoError(err)
+
+       return table.New(identifier, meta, t.location, iceio.LocalFS{}, nil)
+}
+
+func (t *TableWritingTestSuite) TestAddFilesUnpartitioned() {
+       ident := table.Identifier{"default", "unpartitioned_table_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion,
+               *iceberg.UnpartitionedSpec, t.tableSchema)
+
+       t.NotNil(tbl)
+
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := fmt.Sprintf("%s/unpartitioned/test-%d.parquet", 
t.location, i)
+               t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, t.arrTbl)
+               files = append(files, filePath)
+       }
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(files, nil, false))
+
+       stagedTbl, err := tx.StagedTable()
+       t.Require().NoError(err)
+       t.NotNil(stagedTbl.NameMapping())
+
+       t.Equal(stagedTbl.CurrentSnapshot().Summary,
+               &table.Summary{
+                       Operation: table.OpAppend,
+                       Properties: iceberg.Properties{
+                               "added-data-files":       "5",
+                               "added-files-size":       "3660",
+                               "added-records":          "5",
+                               "total-data-files":       "5",
+                               "total-delete-files":     "0",
+                               "total-equality-deletes": "0",
+                               "total-files-size":       "3660",
+                               "total-position-deletes": "0",
+                               "total-records":          "5",
+                       },
+               })
+
+       contents, err := stagedTbl.Scan().ToArrowTable(context.Background())
+       t.Require().NoError(err)
+       defer contents.Release()
+
+       t.EqualValues(5, contents.NumRows())
+}
+
+func (t *TableWritingTestSuite) TestAddFilesFileNotFound() {
+       ident := table.Identifier{"default", 
"unpartitioned_table_file_not_found_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion,
+               *iceberg.UnpartitionedSpec, t.tableSchema)
+
+       t.NotNil(tbl)
+
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := 
fmt.Sprintf("%s/unpartitioned_file_not_found/test-%d.parquet", t.location, i)
+               t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, t.arrTbl)
+               files = append(files, filePath)
+       }
+
+       files = append(files, 
t.location+"/unpartitioned_file_not_found/unknown.parquet")
+       tx := tbl.NewTransaction()
+       err := tx.AddFiles(files, nil, false)
+       t.Error(err)
+       t.ErrorContains(err, "no such file or directory")
+}
+
+func (t *TableWritingTestSuite) TestAddFilesUnpartitionedHasFieldIDs() {
+       ident := table.Identifier{"default", "unpartitioned_table_with_ids_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion,
+               *iceberg.UnpartitionedSpec, t.tableSchema)
+
+       t.NotNil(tbl)
+
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := 
fmt.Sprintf("%s/unpartitioned_with_ids/test-%d.parquet", t.location, i)
+               t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, 
t.arrTblWithIDs)
+               files = append(files, filePath)
+       }
+
+       tx := tbl.NewTransaction()
+       err := tx.AddFiles(files, nil, false)
+       t.Error(err)
+       t.ErrorIs(err, iceberg.ErrNotImplemented)
+}
+
+func (t *TableWritingTestSuite) TestAddFilesFailsSchemaMismatch() {
+       ident := table.Identifier{"default", 
"unpartitioned_table_schema_mismatch_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion,
+               *iceberg.UnpartitionedSpec, t.tableSchema)
+       t.Require().NotNil(tbl)
+
+       wrongSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "foo", Type: arrow.FixedWidthTypes.Boolean, Nullable: 
true},
+               {Name: "bar", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "baz", Type: arrow.BinaryTypes.String, Nullable: true}, 
// should be int32
+               {Name: "qux", Type: arrow.PrimitiveTypes.Date32, Nullable: 
true},
+       }, nil)
+
+       filePath := 
fmt.Sprintf("%s/unpartitioned_schema_mismatch_v%d/test.parquet", t.location, 
t.formatVersion)
+       mismatchTable, err := array.TableFromJSON(memory.DefaultAllocator, 
wrongSchema, []string{
+               `[{"foo": true, "bar": "bar_string", "baz": "123", "qux": 
"2024-03-07"},
+                 {"foo": false, "bar": "bar_string", "baz": "456", "qux": 
"2024-03-07"}]`,
+       })
+       t.Require().NoError(err)
+       t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, mismatchTable)
+
+       files := []string{filePath}
+
+       pterm.DisableOutput() // disable error to console
+       defer pterm.EnableOutput()
+
+       tx := tbl.NewTransaction()
+       err = tx.AddFiles(files, nil, false)
+       t.Error(err)
+       t.EqualError(err, `error encountered during parquet file conversion: 
error encountered during schema visitor: mismatch in fields:
+   | Table Field              | Requested Field         
+✅ | 1: foo: optional boolean | 1: foo: optional boolean
+✅ | 2: bar: optional string  | 2: bar: optional string 
+❌ | 3: baz: optional int     | 3: baz: optional string 
+✅ | 4: qux: optional date    | 4: qux: optional date   
+`)
+}
+
+func TestTableWriting(t *testing.T) {
+       suite.Run(t, &TableWritingTestSuite{formatVersion: 1})

Review Comment:
   also run for V2 tables? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to