Fokko commented on code in PR #185:
URL: https://github.com/apache/iceberg-go/pull/185#discussion_r1827422531


##########
manifest.go:
##########
@@ -627,52 +640,29 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K, 
V]) map[K]V {
        return out
 }
 
-func avroPartitionData(input map[string]any) map[string]any {
-       // hambra/avro/v2 will unmarshal a map[string]any such that
-       // each entry will actually be a map[string]any with the key being
-       // the avro type, not the field name.
-       //
-       // This means that partition data that looks like this:
-       //
-       //  [{"field-id": 1000, "name": "ts", "type": {"type": "int", 
"logicalType": "date"}}]
-       //
-       // Becomes:
-       //
-       //  map[string]any{"ts": map[string]any{"int.date": time.Time{}}}
-       //
-       // so we need to simplify our map and make the partition data handling 
easier
+func avroPartitionData(input map[string]any, nameToID map[string]int, 
logicalTypes map[int]avro.LogicalType) map[string]any {
        out := make(map[string]any)
        for k, v := range input {
-               switch v := v.(type) {
-               case map[string]any:
-                       for typeName, val := range v {
-                               switch typeName {
-                               case "int.date":
-                                       out[k] = 
Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
-                               case "int.time-millis":
-                                       out[k] = 
Time(val.(time.Duration).Microseconds())
-                               case "long.time-micros":
-                                       out[k] = 
Time(val.(time.Duration).Microseconds())
-                               case "long.timestamp-millis":
-                                       out[k] = 
Timestamp(val.(time.Time).UTC().UnixMicro())
-                               case "long.timestamp-micros":
-                                       out[k] = 
Timestamp(val.(time.Time).UTC().UnixMicro())
-                               case "bytes.decimal":
-                                       // not implemented yet
-                               case "fixed.decimal":
-                                       // not implemented yet
+               if id, ok := nameToID[k]; ok {
+                       if logical, ok := logicalTypes[id]; ok {
+                               switch logical {
+                               case avro.Date:
+                                       out[k] = 
Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
+                               case avro.TimeMillis:
+                                       out[k] = 
Time(v.(time.Duration).Microseconds())

Review Comment:
   ```suggestion
                                        out[k] = 
Time(v.(time.Duration).Milliseconds())
   ```



##########
manifest.go:
##########
@@ -627,52 +640,29 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K, 
V]) map[K]V {
        return out
 }
 
-func avroPartitionData(input map[string]any) map[string]any {
-       // hambra/avro/v2 will unmarshal a map[string]any such that
-       // each entry will actually be a map[string]any with the key being
-       // the avro type, not the field name.
-       //
-       // This means that partition data that looks like this:
-       //
-       //  [{"field-id": 1000, "name": "ts", "type": {"type": "int", 
"logicalType": "date"}}]
-       //
-       // Becomes:
-       //
-       //  map[string]any{"ts": map[string]any{"int.date": time.Time{}}}
-       //
-       // so we need to simplify our map and make the partition data handling 
easier
+func avroPartitionData(input map[string]any, nameToID map[string]int, 
logicalTypes map[int]avro.LogicalType) map[string]any {
        out := make(map[string]any)
        for k, v := range input {
-               switch v := v.(type) {
-               case map[string]any:
-                       for typeName, val := range v {
-                               switch typeName {
-                               case "int.date":
-                                       out[k] = 
Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
-                               case "int.time-millis":
-                                       out[k] = 
Time(val.(time.Duration).Microseconds())
-                               case "long.time-micros":
-                                       out[k] = 
Time(val.(time.Duration).Microseconds())
-                               case "long.timestamp-millis":
-                                       out[k] = 
Timestamp(val.(time.Time).UTC().UnixMicro())
-                               case "long.timestamp-micros":
-                                       out[k] = 
Timestamp(val.(time.Time).UTC().UnixMicro())
-                               case "bytes.decimal":
-                                       // not implemented yet
-                               case "fixed.decimal":
-                                       // not implemented yet
+               if id, ok := nameToID[k]; ok {
+                       if logical, ok := logicalTypes[id]; ok {
+                               switch logical {
+                               case avro.Date:
+                                       out[k] = 
Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
+                               case avro.TimeMillis:
+                                       out[k] = 
Time(v.(time.Duration).Microseconds())
+                               case avro.TimeMicros:
+                                       out[k] = 
Time(v.(time.Duration).Microseconds())
+                               case avro.TimestampMillis:
+                                       out[k] = 
Timestamp(v.(time.Time).UTC().UnixMicro())

Review Comment:
   ```suggestion
                                        out[k] = 
Timestamp(v.(time.Time).UTC().UnixMilli())
   ```



##########
table/substrait/functions_set.yaml:
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+%YAML 1.2
+---
+scalar_functions:
+  -
+    name: "is_in"
+    description: >
+      Checks membership of a value in a list of values
+
+      Returns true or false if `needle` is found in `haystack`.
+    impls:
+      - args:
+        - name: needle
+          value: any1
+        - name: haystack
+          value: list<any1>
+        options:
+          nan_equality:
+            values: [ NAN_IS_NAN, NAN_IS_NOT_NAN ]

Review Comment:
   I'm not deeply familiar with substrait, so be gentle. Wouldn't it make more 
sense to rewrite expressions, eg: `a IS IN (null, nan, 'a', 'b', 'c')` to `a IS 
IN ('a', 'b', 'c') ||  is_nan(a) || not is_null(a)`



##########
table/arrow_scanner.go:
##########
@@ -0,0 +1,597 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "io"
+       "iter"
+       "runtime"
+       "strconv"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/arrow-go/v18/arrow/compute/exprs"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table/internal"
+       "github.com/apache/iceberg-go/table/substrait"
+       "github.com/substrait-io/substrait-go/expr"
+       "golang.org/x/sync/errgroup"
+)
+
+const (
+       ScanOptionArrowUseLargeTypes = "arrow.use_large_types"
+)
+
+type positionDeletes = []*arrow.Chunked
+type perFileDeletes = map[string]positionDeletes
+
+func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks 
[]FileScanTask) (perFileDeletes, error) {
+       var (
+               deletesPerFile = make(perFileDeletes)
+               uniqueDeletes  = make(map[string]iceberg.DataFile)
+               err            error
+       )
+
+       for _, t := range tasks {
+               for _, d := range t.DeleteFiles {

Review Comment:
   Should we call these deletes positional explicitly?



##########
table/internal/parquet_files.go:
##########
@@ -0,0 +1,421 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "fmt"
+       "slices"
+       "strconv"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/file"
+       "github.com/apache/arrow-go/v18/parquet/metadata"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+)
+
+type ParquetFileSource struct {
+       mem          memory.Allocator
+       fs           iceio.IO
+       file         iceberg.DataFile
+       isPosDeletes bool
+}
+
+type wrapPqArrowReader struct {
+       *pqarrow.FileReader
+}
+
+func (w wrapPqArrowReader) Close() error {
+       return w.ParquetReader().Close()
+}
+
+func (w wrapPqArrowReader) PrunedSchema(projectedIDs map[int]struct{}) 
(*arrow.Schema, []int, error) {
+       return pruneParquetColumns(w.Manifest, projectedIDs, false)
+}
+
+func (w wrapPqArrowReader) GetRecords(ctx context.Context, cols []int, tester 
any) (array.RecordReader, error) {
+       var (
+               testRg func(*metadata.RowGroupMetaData, []int) (bool, error)
+               ok     bool
+       )
+
+       if tester != nil {
+               testRg, ok = tester.(func(*metadata.RowGroupMetaData, []int) 
(bool, error))
+               if !ok {
+                       return nil, fmt.Errorf("%w: invalid tester function", 
iceberg.ErrInvalidArgument)
+               }
+       }
+
+       var rgList []int
+       if testRg != nil {
+               rgList = make([]int, 0)
+               fileMeta, numRg := w.ParquetReader().MetaData(), 
w.ParquetReader().NumRowGroups()
+               for rg := 0; rg < numRg; rg++ {
+                       rgMeta := fileMeta.RowGroup(rg)
+                       use, err := testRg(rgMeta, cols)
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       if use {
+                               rgList = append(rgList, rg)
+                       }
+               }
+       }
+
+       return w.GetRecordReader(ctx, cols, rgList)
+}
+
+func (pfs *ParquetFileSource) GetReader(ctx context.Context) (FileReader, 
error) {
+       pf, err := pfs.fs.Open(pfs.file.FilePath())
+       if err != nil {
+               return nil, err
+       }
+
+       rdr, err := file.NewParquetReader(pf,
+               file.WithReadProps(parquet.NewReaderProperties(pfs.mem)))
+       if err != nil {
+               return nil, err
+       }
+
+       // TODO: grab these from the context
+       arrProps := pqarrow.ArrowReadProperties{
+               Parallel:  true,
+               BatchSize: 1 << 17,
+       }
+
+       if pfs.isPosDeletes {
+               // for dictionary for filepath col
+               arrProps.SetReadDict(0, true)
+       }
+
+       fr, err := pqarrow.NewFileReader(rdr, arrProps, pfs.mem)
+       if err != nil {
+               return nil, err
+       }
+
+       return wrapPqArrowReader{fr}, nil
+}
+
+type manifestVisitor[T any] interface {
+       Manifest(*pqarrow.SchemaManifest, []T) T
+       Field(pqarrow.SchemaField, T) T
+       Struct(pqarrow.SchemaField, []T) T
+       List(pqarrow.SchemaField, T) T
+       Map(pqarrow.SchemaField, T, T) T
+       Primitive(pqarrow.SchemaField) T
+}
+
+func visitParquetManifest[T any](manifest *pqarrow.SchemaManifest, visitor 
manifestVisitor[T]) (res T, err error) {
+       if manifest == nil {
+               err = fmt.Errorf("%w: cannot visit nil manifest", 
iceberg.ErrInvalidArgument)
+               return
+       }
+
+       defer func() {
+               if r := recover(); r != nil {
+                       err = fmt.Errorf("%s", r)
+               }
+       }()
+
+       results := make([]T, len(manifest.Fields))
+       for i, f := range manifest.Fields {
+               res := visitManifestField(f, visitor)
+               results[i] = visitor.Field(f, res)
+       }
+       return visitor.Manifest(manifest, results), nil
+}
+
+func visitParquetManifestStruct[T any](field pqarrow.SchemaField, visitor 
manifestVisitor[T]) T {
+       results := make([]T, len(field.Children))
+
+       for i, f := range field.Children {
+               results[i] = visitManifestField(f, visitor)
+       }
+
+       return visitor.Struct(field, results)
+}
+
+func visitManifestList[T any](field pqarrow.SchemaField, visitor 
manifestVisitor[T]) T {
+       elemField := field.Children[0]
+       res := visitManifestField(elemField, visitor)
+       return visitor.List(field, res)
+}
+
+func visitManifestMap[T any](field pqarrow.SchemaField, visitor 
manifestVisitor[T]) T {
+       kvfield := field.Children[0]
+       keyField, valField := kvfield.Children[0], kvfield.Children[1]
+
+       return visitor.Map(field, visitManifestField(keyField, visitor), 
visitManifestField(valField, visitor))
+}
+
+func visitManifestField[T any](field pqarrow.SchemaField, visitor 
manifestVisitor[T]) T {
+       switch field.Field.Type.(type) {
+       case *arrow.StructType:
+               return visitParquetManifestStruct(field, visitor)
+       case *arrow.MapType:
+               return visitManifestMap(field, visitor)
+       case arrow.ListLikeType:
+               return visitManifestList(field, visitor)
+       default:
+               return visitor.Primitive(field)
+       }
+}
+
+func pruneParquetColumns(manifest *pqarrow.SchemaManifest, selected 
map[int]struct{}, selectFullTypes bool) (*arrow.Schema, []int, error) {
+       visitor := &pruneParquetSchema{
+               selected:  selected,
+               manifest:  manifest,
+               fullTypes: selectFullTypes,
+               indices:   []int{},
+       }
+
+       result, err := visitParquetManifest[arrow.Field](manifest, visitor)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       return arrow.NewSchema(result.Type.(*arrow.StructType).Fields(), 
&result.Metadata),
+               visitor.indices, nil
+}
+
+func getFieldID(f arrow.Field) *int {
+       if !f.HasMetadata() {
+               return nil
+       }
+
+       fieldIDStr, ok := f.Metadata.GetValue("PARQUET:field_id")
+       if !ok {
+               return nil
+       }
+
+       id, err := strconv.Atoi(fieldIDStr)
+       if err != nil {
+               return nil
+       }
+
+       return &id
+}
+
+type pruneParquetSchema struct {
+       selected  map[int]struct{}
+       fullTypes bool
+       manifest  *pqarrow.SchemaManifest
+
+       indices []int
+}
+
+func (p *pruneParquetSchema) fieldID(field arrow.Field) int {
+       if id := getFieldID(field); id != nil {
+               return *id
+       }
+
+       panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing 
field_id",
+               iceberg.ErrInvalidSchema, field))
+}
+
+func (p *pruneParquetSchema) Manifest(manifest *pqarrow.SchemaManifest, fields 
[]arrow.Field) arrow.Field {
+       finalFields := slices.DeleteFunc(fields, func(f arrow.Field) bool { 
return f.Type == nil })
+       result := arrow.Field{
+               Type: arrow.StructOf(finalFields...),
+       }
+       if manifest.SchemaMeta != nil {
+               result.Metadata = *manifest.SchemaMeta
+       }
+
+       return result
+}
+
+func (p *pruneParquetSchema) Struct(field pqarrow.SchemaField, children 
[]arrow.Field) arrow.Field {
+       selected, fields := []arrow.Field{}, field.Children
+       sameType := true
+
+       for i, t := range children {
+               field := fields[i]
+               if arrow.TypeEqual(field.Field.Type, t.Type) {
+                       selected = append(selected, *field.Field)
+               } else if t.Type == nil {
+                       sameType = false
+                       // type has changed, create a new field with the 
projected type

Review Comment:
   Looks all good, you even copied the comments for convenience :)



##########
table/substrait/substrait.go:
##########
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package substrait
+
+import (
+       _ "embed"
+       "fmt"
+       "strings"
+
+       "github.com/apache/arrow-go/v18/arrow/compute/exprs"
+       "github.com/apache/iceberg-go"
+       "github.com/substrait-io/substrait-go/expr"
+       "github.com/substrait-io/substrait-go/extensions"
+       "github.com/substrait-io/substrait-go/types"
+)
+
+//go:embed functions_set.yaml
+var funcsetYAML string
+
+var (
+       collection = extensions.DefaultCollection
+       funcSetURI = 
"https://github.com/apache/iceberg-go/blob/main/table/substrait/functions_set.yaml";
+)
+
+func init() {
+       if !collection.URILoaded(funcSetURI) {
+               if err := collection.Load(funcSetURI, 
strings.NewReader(funcsetYAML)); err != nil {
+                       panic(err)
+               }
+       }
+}
+
+func NewExtensionSet() exprs.ExtensionIDSet {
+       return 
exprs.NewExtensionSetDefault(expr.NewEmptyExtensionRegistry(&collection))
+}
+
+// ConvertExpr binds the provided expression to the given schema and converts 
it to a
+// substrait expression so that it can be utilized for computation.
+func ConvertExpr(schema *iceberg.Schema, e iceberg.BooleanExpression) 
(*expr.ExtensionRegistry, expr.Expression, error) {
+       base, err := ConvertSchema(schema)
+       if err != nil {
+               return nil, nil, err
+       }       
+
+       reg := expr.NewEmptyExtensionRegistry(&extensions.DefaultCollection)
+
+       bldr := expr.ExprBuilder{Reg: reg, BaseSchema: &base.Struct}
+       b, err := iceberg.VisitExpr(e, &toSubstraitExpr{bldr: bldr, schema: 
schema})
+       if err != nil {
+               return nil, nil, err
+       }
+
+       out, err := b.BuildExpr()
+       return &reg, out, err
+}
+
+// ConvertSchema converts an Iceberg schema to a substrait NamedStruct using
+// the appropriate types and column names.
+func ConvertSchema(schema *iceberg.Schema) (res types.NamedStruct, err error) {
+       var typ types.Type
+
+       typ, err = iceberg.Visit(schema, convertToSubstrait{})
+       if err != nil {
+               return
+       }
+
+       val := typ.(*types.StructType)
+       res.Struct = *val
+
+       res.Names = make([]string, schema.NumFields())
+       for i, f := range schema.Fields() {
+               res.Names[i] = f.Name
+       }
+
+       return
+}
+
+type convertToSubstrait struct{}
+
+func (convertToSubstrait) Schema(_ *iceberg.Schema, result types.Type) 
types.Type {
+       return result.WithNullability(types.NullabilityNullable)
+}
+
+func (convertToSubstrait) Struct(_ iceberg.StructType, results []types.Type) 
types.Type {
+       return &types.StructType{
+               Nullability: types.NullabilityUnspecified,
+               Types:       results,
+       }
+}
+
+func getNullability(required bool) types.Nullability {
+       if required {
+               return types.NullabilityRequired
+       }
+       return types.NullabilityNullable
+}
+
+func (convertToSubstrait) Field(field iceberg.NestedField, result types.Type) 
types.Type {
+       return result.WithNullability(getNullability(field.Required))
+}
+
+func (c convertToSubstrait) List(list iceberg.ListType, elemResult types.Type) 
types.Type {
+       return &types.ListType{
+               Nullability: types.NullabilityUnspecified,
+               Type:        c.Field(list.ElementField(), elemResult),
+       }
+}
+
+func (c convertToSubstrait) Map(m iceberg.MapType, keyResult, valResult 
types.Type) types.Type {
+       return &types.MapType{
+               Nullability: types.NullabilityUnspecified,
+               Key:         c.Field(m.KeyField(), keyResult),
+               Value:       c.Field(m.ValueField(), valResult),
+       }
+}
+
+func (convertToSubstrait) Primitive(iceberg.PrimitiveType) types.Type { 
panic("should not be called") }
+
+func (convertToSubstrait) VisitFixed(f iceberg.FixedType) types.Type {
+       return &types.FixedBinaryType{Length: int32(f.Len())}
+}
+
+func (convertToSubstrait) VisitDecimal(d iceberg.DecimalType) types.Type {
+       return &types.DecimalType{Precision: int32(d.Precision()), Scale: 
int32(d.Scale())}
+}
+
+func (convertToSubstrait) VisitBoolean() types.Type     { return 
&types.BooleanType{} }
+func (convertToSubstrait) VisitInt32() types.Type       { return 
&types.Int32Type{} }
+func (convertToSubstrait) VisitInt64() types.Type       { return 
&types.Int64Type{} }
+func (convertToSubstrait) VisitFloat32() types.Type     { return 
&types.Float32Type{} }
+func (convertToSubstrait) VisitFloat64() types.Type     { return 
&types.Float64Type{} }
+func (convertToSubstrait) VisitDate() types.Type        { return 
&types.DateType{} }
+func (convertToSubstrait) VisitTime() types.Type        { return 
&types.TimeType{} }
+func (convertToSubstrait) VisitTimestamp() types.Type   { return 
&types.TimestampType{} }
+func (convertToSubstrait) VisitTimestampTz() types.Type { return 
&types.TimestampTzType{} }
+func (convertToSubstrait) VisitString() types.Type      { return 
&types.StringType{} }
+func (convertToSubstrait) VisitBinary() types.Type      { return 
&types.BinaryType{} }
+func (convertToSubstrait) VisitUUID() types.Type        { return 
&types.UUIDType{} }
+
+var (
+       _ iceberg.SchemaVisitorPerPrimitiveType[types.Type] = 
(*convertToSubstrait)(nil)
+)
+
+var (
+       boolURI    = extensions.SubstraitDefaultURIPrefix + 
"functions_boolean.yaml"
+       compareURI = extensions.SubstraitDefaultURIPrefix + 
"functions_comparison.yaml"
+       stringURI  = extensions.SubstraitDefaultURIPrefix + 
"functions_string.yaml"
+
+       notID          = extensions.ID{URI: boolURI, Name: "not"}
+       andID          = extensions.ID{URI: boolURI, Name: "and"}
+       orID           = extensions.ID{URI: boolURI, Name: "or"}
+       isNaNID        = extensions.ID{URI: compareURI, Name: "is_nan"}
+       isNullID       = extensions.ID{URI: compareURI, Name: "is_null"}
+       isNotNullID    = extensions.ID{URI: compareURI, Name: "is_not_null"}
+       equalID        = extensions.ID{URI: compareURI, Name: "equal"}
+       notEqualID     = extensions.ID{URI: compareURI, Name: "not_equal"}
+       greaterEqualID = extensions.ID{URI: compareURI, Name: "gte"}
+       greaterID      = extensions.ID{URI: compareURI, Name: "gt"}
+       lessEqualID    = extensions.ID{URI: compareURI, Name: "lte"}
+       lessID         = extensions.ID{URI: compareURI, Name: "lt"}
+       startsWithID   = extensions.ID{URI: stringURI, Name: "starts_with"}
+       isInID         = extensions.ID{URI: funcSetURI, Name: "is_in"}
+)
+
+type toSubstraitExpr struct {
+       schema *iceberg.Schema
+       bldr   expr.ExprBuilder
+}
+
+func (t *toSubstraitExpr) VisitTrue() expr.Builder {
+       return t.bldr.Wrap(expr.NewLiteral(true, false))
+}
+
+func (t *toSubstraitExpr) VisitFalse() expr.Builder {
+       return t.bldr.Wrap(expr.NewLiteral(false, false))
+}
+
+func (t *toSubstraitExpr) VisitNot(child expr.Builder) expr.Builder {
+       return t.bldr.ScalarFunc(notID).Args(child.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitAnd(left, right expr.Builder) expr.Builder {
+       return t.bldr.ScalarFunc(andID).Args(left.(expr.FuncArgBuilder),
+               right.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitOr(left, right expr.Builder) expr.Builder {
+       return t.bldr.ScalarFunc(orID).Args(left.(expr.FuncArgBuilder),
+               right.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitUnbound(iceberg.UnboundPredicate) expr.Builder {
+       panic("can only convert bound expressions to substrait")
+}
+
+func (t *toSubstraitExpr) VisitBound(pred iceberg.BoundPredicate) expr.Builder 
{
+       return iceberg.VisitBoundPredicate(pred, t)
+}
+
+type substraitPrimitiveLiteralTypes interface {
+       bool | ~int32 | ~int64 | float32 | float64 | string
+}
+
+func toPrimitiveSubstraitLiteral[T substraitPrimitiveLiteralTypes](v T) 
expr.Literal {
+       return expr.NewPrimitiveLiteral(v, false)
+}
+
+func toByteSliceSubstraitLiteral[T []byte | types.UUID](v T) expr.Literal {
+       return expr.NewByteSliceLiteral(v, false)
+}
+
+func toDecimalLiteral(v iceberg.DecimalLiteral) expr.Literal {
+       byts, _ := v.MarshalBinary()
+       result, _ := expr.NewLiteral(&types.Decimal{
+               Scale:     int32(v.Scale),
+               Value:     byts,
+               Precision: int32(v.Type().(*iceberg.DecimalType).Precision()),
+       }, false)
+       return result
+}
+
+func toFixedLiteral(v iceberg.FixedLiteral) expr.Literal {
+       return expr.NewFixedBinaryLiteral(types.FixedBinary(v), false)
+}
+
+func toSubstraitLiteral(typ iceberg.Type, lit iceberg.Literal) expr.Literal {
+       switch lit := lit.(type) {
+       case iceberg.BoolLiteral:
+               return toPrimitiveSubstraitLiteral(bool(lit))
+       case iceberg.Int32Literal:
+               return toPrimitiveSubstraitLiteral(int32(lit))
+       case iceberg.Int64Literal:
+               return toPrimitiveSubstraitLiteral(int64(lit))
+       case iceberg.Float32Literal:
+               return toPrimitiveSubstraitLiteral(float32(lit))
+       case iceberg.Float64Literal:
+               return toPrimitiveSubstraitLiteral(float64(lit))
+       case iceberg.StringLiteral:
+               return toPrimitiveSubstraitLiteral(string(lit))
+       case iceberg.TimestampLiteral:
+               if typ.Equals(iceberg.PrimitiveTypes.TimestampTz) {
+                       return 
toPrimitiveSubstraitLiteral(types.TimestampTz(lit))
+               }
+               return toPrimitiveSubstraitLiteral(types.Timestamp(lit))
+       case iceberg.DateLiteral:
+               return toPrimitiveSubstraitLiteral(types.Date(lit))
+       case iceberg.TimeLiteral:
+               return toPrimitiveSubstraitLiteral(types.Time(lit))
+       case iceberg.BinaryLiteral:
+               return toByteSliceSubstraitLiteral([]byte(lit))
+       case iceberg.FixedLiteral:
+               return toFixedLiteral(lit)
+       case iceberg.UUIDLiteral:
+               return toByteSliceSubstraitLiteral(types.UUID(lit[:]))
+       case iceberg.DecimalLiteral:
+               return toDecimalLiteral(lit)
+       }
+       panic(fmt.Errorf("invalid literal type: %s", lit.Type()))
+}
+
+func toSubstraitLiteralSet(typ iceberg.Type, lits []iceberg.Literal) 
expr.ListLiteralValue {
+       if len(lits) == 0 {
+               return nil
+       }
+
+       out := make([]expr.Literal, len(lits))
+       for i, l := range lits {
+               out[i] = toSubstraitLiteral(typ, l)
+       }
+       return out
+}
+
+func (t *toSubstraitExpr) getRef(ref iceberg.BoundReference) expr.Reference {
+       updatedRef, err := iceberg.Reference(ref.Field().Name).Bind(t.schema, 
true)

Review Comment:
   Do we want to pass down the case-sensitivity from the ScanOption?



##########
table/arrow_scanner.go:
##########
@@ -0,0 +1,597 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "io"
+       "iter"
+       "runtime"
+       "strconv"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/arrow-go/v18/arrow/compute/exprs"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table/internal"
+       "github.com/apache/iceberg-go/table/substrait"
+       "github.com/substrait-io/substrait-go/expr"
+       "golang.org/x/sync/errgroup"
+)
+
+const (
+       ScanOptionArrowUseLargeTypes = "arrow.use_large_types"
+)
+
+type positionDeletes = []*arrow.Chunked
+type perFileDeletes = map[string]positionDeletes
+
+func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks 
[]FileScanTask) (perFileDeletes, error) {
+       var (
+               deletesPerFile = make(perFileDeletes)
+               uniqueDeletes  = make(map[string]iceberg.DataFile)
+               err            error
+       )
+
+       for _, t := range tasks {
+               for _, d := range t.DeleteFiles {
+                       if _, ok := uniqueDeletes[d.FilePath()]; !ok {
+                               uniqueDeletes[d.FilePath()] = d
+                       }
+               }
+       }
+
+       if len(uniqueDeletes) == 0 {
+               return deletesPerFile, nil
+       }
+
+       g, ctx := errgroup.WithContext(ctx)
+       g.SetLimit(runtime.NumCPU())
+
+       perFileChan := make(chan map[string]*arrow.Chunked, runtime.NumCPU())
+       go func() {
+               defer close(perFileChan)
+               for _, v := range uniqueDeletes {
+                       g.Go(func() error {
+                               deletes, err := readDeletes(ctx, fs, v)
+                               if deletes != nil {
+                                       perFileChan <- deletes
+                               }
+                               return err
+                       })
+               }
+
+               err = g.Wait()
+       }()
+
+       for deletes := range perFileChan {
+               for file, arr := range deletes {
+                       deletesPerFile[file] = append(deletesPerFile[file], arr)
+               }
+       }
+
+       return deletesPerFile, err
+}
+
+func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile) 
(map[string]*arrow.Chunked, error) {
+       src, err := internal.GetFile(ctx, fs, dataFile, true)
+       if err != nil {
+               return nil, err
+       }
+
+       rdr, err := src.GetReader(ctx)
+       if err != nil {
+               return nil, err
+       }
+       defer rdr.Close()
+
+       tbl, err := rdr.ReadTable(ctx)
+       if err != nil {
+               return nil, err
+       }
+       defer tbl.Release()
+
+       tbl, err = array.UnifyTableDicts(compute.GetAllocator(ctx), tbl)
+       if err != nil {
+               return nil, err
+       }
+       defer tbl.Release()
+
+       filePathCol := 
tbl.Column(tbl.Schema().FieldIndices("file_path")[0]).Data()
+       posCol := tbl.Column(tbl.Schema().FieldIndices("pos")[0]).Data()

Review Comment:
   If you want to go full Iceberg, then you want to look up these columns by 
FieldID as well :)



##########
table/internal/parquet_files.go:
##########
@@ -0,0 +1,421 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "fmt"
+       "slices"
+       "strconv"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/file"
+       "github.com/apache/arrow-go/v18/parquet/metadata"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+)
+
+type ParquetFileSource struct {
+       mem          memory.Allocator
+       fs           iceio.IO
+       file         iceberg.DataFile
+       isPosDeletes bool

Review Comment:
   This information can also be deducted from the `DataFile`, where we have 
`134: content`



##########
README.md:
##########
@@ -77,9 +77,10 @@ $ cd iceberg-go/cmd/iceberg && go build .
 
 ### Read/Write Data Support
 
-* No intrinsic support for reading/writing data yet
-* Data can be manually read currently by retrieving data files via Manifests.
-* Plan to add [Apache 
Arrow](https://pkg.go.dev/github.com/apache/arrow/go/v14@v14.0.0) support 
eventually.
+* No intrinsic support for writing data yet.
+* Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow-go/) 
support eventually.
+* Data can currently be read as an Arrow Table or as a stream of Arrow
+record batches.

Review Comment:
   ```suggestion
   * Data can currently be read as an Arrow Table or as a stream of Arrow 
record batches.
   ```



##########
table/scanner.go:
##########
@@ -393,3 +398,62 @@ type FileScanTask struct {
        DeleteFiles   []iceberg.DataFile
        Start, Length int64
 }
+
+// ToArrowRecords returns the arrow schema of the expected records and an 
interator
+// that can be used with a range expression to read the records as they are 
available.
+// If an error is encountered, during the planning and setup then this will 
return the
+// error directly. If the error occurs while iterating the records, it will be 
returned
+// by the iterator.
+//
+// The purpose for returning the schema up front is to handle the case where 
there are no
+// rows returned. The resulting Arrow Schema of the projection will still be 
known.
+func (s *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, 
iter.Seq2[arrow.Record, error], error) {

Review Comment:
   How about slightly longer variable names? :)
   ```suggestion
   func (scan *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, 
iter.Seq2[arrow.Record, error], error) {
   ```



##########
table/substrait/functions_set.yaml:
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+%YAML 1.2
+---
+scalar_functions:
+  -
+    name: "is_in"
+    description: >
+      Checks membership of a value in a list of values
+
+      Returns true or false if `needle` is found in `haystack`.
+    impls:
+      - args:
+        - name: needle
+          value: any1
+        - name: haystack
+          value: list<any1>
+        options:
+          nan_equality:
+            values: [ NAN_IS_NAN, NAN_IS_NOT_NAN ]

Review Comment:
   Just sharing an idea. We could also rewrite the whole thing to just equality 
expressions :)



##########
table/evaluators.go:
##########
@@ -690,6 +706,46 @@ type inclusiveMetricsEval struct {
        includeEmptyFiles bool
 }
 
+func (m *inclusiveMetricsEval) TestRowGroup(rgmeta *metadata.RowGroupMetaData, 
colIndices []int) (bool, error) {

Review Comment:
   Nice one!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to