Fokko commented on code in PR #185: URL: https://github.com/apache/iceberg-go/pull/185#discussion_r1827422531
########## manifest.go: ########## @@ -627,52 +640,29 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K, V]) map[K]V { return out } -func avroPartitionData(input map[string]any) map[string]any { - // hambra/avro/v2 will unmarshal a map[string]any such that - // each entry will actually be a map[string]any with the key being - // the avro type, not the field name. - // - // This means that partition data that looks like this: - // - // [{"field-id": 1000, "name": "ts", "type": {"type": "int", "logicalType": "date"}}] - // - // Becomes: - // - // map[string]any{"ts": map[string]any{"int.date": time.Time{}}} - // - // so we need to simplify our map and make the partition data handling easier +func avroPartitionData(input map[string]any, nameToID map[string]int, logicalTypes map[int]avro.LogicalType) map[string]any { out := make(map[string]any) for k, v := range input { - switch v := v.(type) { - case map[string]any: - for typeName, val := range v { - switch typeName { - case "int.date": - out[k] = Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds())) - case "int.time-millis": - out[k] = Time(val.(time.Duration).Microseconds()) - case "long.time-micros": - out[k] = Time(val.(time.Duration).Microseconds()) - case "long.timestamp-millis": - out[k] = Timestamp(val.(time.Time).UTC().UnixMicro()) - case "long.timestamp-micros": - out[k] = Timestamp(val.(time.Time).UTC().UnixMicro()) - case "bytes.decimal": - // not implemented yet - case "fixed.decimal": - // not implemented yet + if id, ok := nameToID[k]; ok { + if logical, ok := logicalTypes[id]; ok { + switch logical { + case avro.Date: + out[k] = Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds())) + case avro.TimeMillis: + out[k] = Time(v.(time.Duration).Microseconds()) Review Comment: ```suggestion out[k] = Time(v.(time.Duration).Milliseconds()) ``` ########## manifest.go: ########## @@ -627,52 +640,29 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K, V]) map[K]V { return out } -func avroPartitionData(input map[string]any) map[string]any { - // hambra/avro/v2 will unmarshal a map[string]any such that - // each entry will actually be a map[string]any with the key being - // the avro type, not the field name. - // - // This means that partition data that looks like this: - // - // [{"field-id": 1000, "name": "ts", "type": {"type": "int", "logicalType": "date"}}] - // - // Becomes: - // - // map[string]any{"ts": map[string]any{"int.date": time.Time{}}} - // - // so we need to simplify our map and make the partition data handling easier +func avroPartitionData(input map[string]any, nameToID map[string]int, logicalTypes map[int]avro.LogicalType) map[string]any { out := make(map[string]any) for k, v := range input { - switch v := v.(type) { - case map[string]any: - for typeName, val := range v { - switch typeName { - case "int.date": - out[k] = Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds())) - case "int.time-millis": - out[k] = Time(val.(time.Duration).Microseconds()) - case "long.time-micros": - out[k] = Time(val.(time.Duration).Microseconds()) - case "long.timestamp-millis": - out[k] = Timestamp(val.(time.Time).UTC().UnixMicro()) - case "long.timestamp-micros": - out[k] = Timestamp(val.(time.Time).UTC().UnixMicro()) - case "bytes.decimal": - // not implemented yet - case "fixed.decimal": - // not implemented yet + if id, ok := nameToID[k]; ok { + if logical, ok := logicalTypes[id]; ok { + switch logical { + case avro.Date: + out[k] = Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds())) + case avro.TimeMillis: + out[k] = Time(v.(time.Duration).Microseconds()) + case avro.TimeMicros: + out[k] = Time(v.(time.Duration).Microseconds()) + case avro.TimestampMillis: + out[k] = Timestamp(v.(time.Time).UTC().UnixMicro()) Review Comment: ```suggestion out[k] = Timestamp(v.(time.Time).UTC().UnixMilli()) ``` ########## table/substrait/functions_set.yaml: ########## @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +%YAML 1.2 +--- +scalar_functions: + - + name: "is_in" + description: > + Checks membership of a value in a list of values + + Returns true or false if `needle` is found in `haystack`. + impls: + - args: + - name: needle + value: any1 + - name: haystack + value: list<any1> + options: + nan_equality: + values: [ NAN_IS_NAN, NAN_IS_NOT_NAN ] Review Comment: I'm not deeply familiar with substrait, so be gentle. Wouldn't it make more sense to rewrite expressions, eg: `a IS IN (null, nan, 'a', 'b', 'c')` to `a IS IN ('a', 'b', 'c') || is_nan(a) || not is_null(a)` ########## table/arrow_scanner.go: ########## @@ -0,0 +1,597 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "io" + "iter" + "runtime" + "strconv" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute" + "github.com/apache/arrow-go/v18/arrow/compute/exprs" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/table/internal" + "github.com/apache/iceberg-go/table/substrait" + "github.com/substrait-io/substrait-go/expr" + "golang.org/x/sync/errgroup" +) + +const ( + ScanOptionArrowUseLargeTypes = "arrow.use_large_types" +) + +type positionDeletes = []*arrow.Chunked +type perFileDeletes = map[string]positionDeletes + +func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask) (perFileDeletes, error) { + var ( + deletesPerFile = make(perFileDeletes) + uniqueDeletes = make(map[string]iceberg.DataFile) + err error + ) + + for _, t := range tasks { + for _, d := range t.DeleteFiles { Review Comment: Should we call these deletes positional explicitly? ########## table/internal/parquet_files.go: ########## @@ -0,0 +1,421 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "context" + "fmt" + "slices" + "strconv" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/file" + "github.com/apache/arrow-go/v18/parquet/metadata" + "github.com/apache/arrow-go/v18/parquet/pqarrow" + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" +) + +type ParquetFileSource struct { + mem memory.Allocator + fs iceio.IO + file iceberg.DataFile + isPosDeletes bool +} + +type wrapPqArrowReader struct { + *pqarrow.FileReader +} + +func (w wrapPqArrowReader) Close() error { + return w.ParquetReader().Close() +} + +func (w wrapPqArrowReader) PrunedSchema(projectedIDs map[int]struct{}) (*arrow.Schema, []int, error) { + return pruneParquetColumns(w.Manifest, projectedIDs, false) +} + +func (w wrapPqArrowReader) GetRecords(ctx context.Context, cols []int, tester any) (array.RecordReader, error) { + var ( + testRg func(*metadata.RowGroupMetaData, []int) (bool, error) + ok bool + ) + + if tester != nil { + testRg, ok = tester.(func(*metadata.RowGroupMetaData, []int) (bool, error)) + if !ok { + return nil, fmt.Errorf("%w: invalid tester function", iceberg.ErrInvalidArgument) + } + } + + var rgList []int + if testRg != nil { + rgList = make([]int, 0) + fileMeta, numRg := w.ParquetReader().MetaData(), w.ParquetReader().NumRowGroups() + for rg := 0; rg < numRg; rg++ { + rgMeta := fileMeta.RowGroup(rg) + use, err := testRg(rgMeta, cols) + if err != nil { + return nil, err + } + + if use { + rgList = append(rgList, rg) + } + } + } + + return w.GetRecordReader(ctx, cols, rgList) +} + +func (pfs *ParquetFileSource) GetReader(ctx context.Context) (FileReader, error) { + pf, err := pfs.fs.Open(pfs.file.FilePath()) + if err != nil { + return nil, err + } + + rdr, err := file.NewParquetReader(pf, + file.WithReadProps(parquet.NewReaderProperties(pfs.mem))) + if err != nil { + return nil, err + } + + // TODO: grab these from the context + arrProps := pqarrow.ArrowReadProperties{ + Parallel: true, + BatchSize: 1 << 17, + } + + if pfs.isPosDeletes { + // for dictionary for filepath col + arrProps.SetReadDict(0, true) + } + + fr, err := pqarrow.NewFileReader(rdr, arrProps, pfs.mem) + if err != nil { + return nil, err + } + + return wrapPqArrowReader{fr}, nil +} + +type manifestVisitor[T any] interface { + Manifest(*pqarrow.SchemaManifest, []T) T + Field(pqarrow.SchemaField, T) T + Struct(pqarrow.SchemaField, []T) T + List(pqarrow.SchemaField, T) T + Map(pqarrow.SchemaField, T, T) T + Primitive(pqarrow.SchemaField) T +} + +func visitParquetManifest[T any](manifest *pqarrow.SchemaManifest, visitor manifestVisitor[T]) (res T, err error) { + if manifest == nil { + err = fmt.Errorf("%w: cannot visit nil manifest", iceberg.ErrInvalidArgument) + return + } + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("%s", r) + } + }() + + results := make([]T, len(manifest.Fields)) + for i, f := range manifest.Fields { + res := visitManifestField(f, visitor) + results[i] = visitor.Field(f, res) + } + return visitor.Manifest(manifest, results), nil +} + +func visitParquetManifestStruct[T any](field pqarrow.SchemaField, visitor manifestVisitor[T]) T { + results := make([]T, len(field.Children)) + + for i, f := range field.Children { + results[i] = visitManifestField(f, visitor) + } + + return visitor.Struct(field, results) +} + +func visitManifestList[T any](field pqarrow.SchemaField, visitor manifestVisitor[T]) T { + elemField := field.Children[0] + res := visitManifestField(elemField, visitor) + return visitor.List(field, res) +} + +func visitManifestMap[T any](field pqarrow.SchemaField, visitor manifestVisitor[T]) T { + kvfield := field.Children[0] + keyField, valField := kvfield.Children[0], kvfield.Children[1] + + return visitor.Map(field, visitManifestField(keyField, visitor), visitManifestField(valField, visitor)) +} + +func visitManifestField[T any](field pqarrow.SchemaField, visitor manifestVisitor[T]) T { + switch field.Field.Type.(type) { + case *arrow.StructType: + return visitParquetManifestStruct(field, visitor) + case *arrow.MapType: + return visitManifestMap(field, visitor) + case arrow.ListLikeType: + return visitManifestList(field, visitor) + default: + return visitor.Primitive(field) + } +} + +func pruneParquetColumns(manifest *pqarrow.SchemaManifest, selected map[int]struct{}, selectFullTypes bool) (*arrow.Schema, []int, error) { + visitor := &pruneParquetSchema{ + selected: selected, + manifest: manifest, + fullTypes: selectFullTypes, + indices: []int{}, + } + + result, err := visitParquetManifest[arrow.Field](manifest, visitor) + if err != nil { + return nil, nil, err + } + + return arrow.NewSchema(result.Type.(*arrow.StructType).Fields(), &result.Metadata), + visitor.indices, nil +} + +func getFieldID(f arrow.Field) *int { + if !f.HasMetadata() { + return nil + } + + fieldIDStr, ok := f.Metadata.GetValue("PARQUET:field_id") + if !ok { + return nil + } + + id, err := strconv.Atoi(fieldIDStr) + if err != nil { + return nil + } + + return &id +} + +type pruneParquetSchema struct { + selected map[int]struct{} + fullTypes bool + manifest *pqarrow.SchemaManifest + + indices []int +} + +func (p *pruneParquetSchema) fieldID(field arrow.Field) int { + if id := getFieldID(field); id != nil { + return *id + } + + panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing field_id", + iceberg.ErrInvalidSchema, field)) +} + +func (p *pruneParquetSchema) Manifest(manifest *pqarrow.SchemaManifest, fields []arrow.Field) arrow.Field { + finalFields := slices.DeleteFunc(fields, func(f arrow.Field) bool { return f.Type == nil }) + result := arrow.Field{ + Type: arrow.StructOf(finalFields...), + } + if manifest.SchemaMeta != nil { + result.Metadata = *manifest.SchemaMeta + } + + return result +} + +func (p *pruneParquetSchema) Struct(field pqarrow.SchemaField, children []arrow.Field) arrow.Field { + selected, fields := []arrow.Field{}, field.Children + sameType := true + + for i, t := range children { + field := fields[i] + if arrow.TypeEqual(field.Field.Type, t.Type) { + selected = append(selected, *field.Field) + } else if t.Type == nil { + sameType = false + // type has changed, create a new field with the projected type Review Comment: Looks all good, you even copied the comments for convenience :) ########## table/substrait/substrait.go: ########## @@ -0,0 +1,379 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package substrait + +import ( + _ "embed" + "fmt" + "strings" + + "github.com/apache/arrow-go/v18/arrow/compute/exprs" + "github.com/apache/iceberg-go" + "github.com/substrait-io/substrait-go/expr" + "github.com/substrait-io/substrait-go/extensions" + "github.com/substrait-io/substrait-go/types" +) + +//go:embed functions_set.yaml +var funcsetYAML string + +var ( + collection = extensions.DefaultCollection + funcSetURI = "https://github.com/apache/iceberg-go/blob/main/table/substrait/functions_set.yaml" +) + +func init() { + if !collection.URILoaded(funcSetURI) { + if err := collection.Load(funcSetURI, strings.NewReader(funcsetYAML)); err != nil { + panic(err) + } + } +} + +func NewExtensionSet() exprs.ExtensionIDSet { + return exprs.NewExtensionSetDefault(expr.NewEmptyExtensionRegistry(&collection)) +} + +// ConvertExpr binds the provided expression to the given schema and converts it to a +// substrait expression so that it can be utilized for computation. +func ConvertExpr(schema *iceberg.Schema, e iceberg.BooleanExpression) (*expr.ExtensionRegistry, expr.Expression, error) { + base, err := ConvertSchema(schema) + if err != nil { + return nil, nil, err + } + + reg := expr.NewEmptyExtensionRegistry(&extensions.DefaultCollection) + + bldr := expr.ExprBuilder{Reg: reg, BaseSchema: &base.Struct} + b, err := iceberg.VisitExpr(e, &toSubstraitExpr{bldr: bldr, schema: schema}) + if err != nil { + return nil, nil, err + } + + out, err := b.BuildExpr() + return ®, out, err +} + +// ConvertSchema converts an Iceberg schema to a substrait NamedStruct using +// the appropriate types and column names. +func ConvertSchema(schema *iceberg.Schema) (res types.NamedStruct, err error) { + var typ types.Type + + typ, err = iceberg.Visit(schema, convertToSubstrait{}) + if err != nil { + return + } + + val := typ.(*types.StructType) + res.Struct = *val + + res.Names = make([]string, schema.NumFields()) + for i, f := range schema.Fields() { + res.Names[i] = f.Name + } + + return +} + +type convertToSubstrait struct{} + +func (convertToSubstrait) Schema(_ *iceberg.Schema, result types.Type) types.Type { + return result.WithNullability(types.NullabilityNullable) +} + +func (convertToSubstrait) Struct(_ iceberg.StructType, results []types.Type) types.Type { + return &types.StructType{ + Nullability: types.NullabilityUnspecified, + Types: results, + } +} + +func getNullability(required bool) types.Nullability { + if required { + return types.NullabilityRequired + } + return types.NullabilityNullable +} + +func (convertToSubstrait) Field(field iceberg.NestedField, result types.Type) types.Type { + return result.WithNullability(getNullability(field.Required)) +} + +func (c convertToSubstrait) List(list iceberg.ListType, elemResult types.Type) types.Type { + return &types.ListType{ + Nullability: types.NullabilityUnspecified, + Type: c.Field(list.ElementField(), elemResult), + } +} + +func (c convertToSubstrait) Map(m iceberg.MapType, keyResult, valResult types.Type) types.Type { + return &types.MapType{ + Nullability: types.NullabilityUnspecified, + Key: c.Field(m.KeyField(), keyResult), + Value: c.Field(m.ValueField(), valResult), + } +} + +func (convertToSubstrait) Primitive(iceberg.PrimitiveType) types.Type { panic("should not be called") } + +func (convertToSubstrait) VisitFixed(f iceberg.FixedType) types.Type { + return &types.FixedBinaryType{Length: int32(f.Len())} +} + +func (convertToSubstrait) VisitDecimal(d iceberg.DecimalType) types.Type { + return &types.DecimalType{Precision: int32(d.Precision()), Scale: int32(d.Scale())} +} + +func (convertToSubstrait) VisitBoolean() types.Type { return &types.BooleanType{} } +func (convertToSubstrait) VisitInt32() types.Type { return &types.Int32Type{} } +func (convertToSubstrait) VisitInt64() types.Type { return &types.Int64Type{} } +func (convertToSubstrait) VisitFloat32() types.Type { return &types.Float32Type{} } +func (convertToSubstrait) VisitFloat64() types.Type { return &types.Float64Type{} } +func (convertToSubstrait) VisitDate() types.Type { return &types.DateType{} } +func (convertToSubstrait) VisitTime() types.Type { return &types.TimeType{} } +func (convertToSubstrait) VisitTimestamp() types.Type { return &types.TimestampType{} } +func (convertToSubstrait) VisitTimestampTz() types.Type { return &types.TimestampTzType{} } +func (convertToSubstrait) VisitString() types.Type { return &types.StringType{} } +func (convertToSubstrait) VisitBinary() types.Type { return &types.BinaryType{} } +func (convertToSubstrait) VisitUUID() types.Type { return &types.UUIDType{} } + +var ( + _ iceberg.SchemaVisitorPerPrimitiveType[types.Type] = (*convertToSubstrait)(nil) +) + +var ( + boolURI = extensions.SubstraitDefaultURIPrefix + "functions_boolean.yaml" + compareURI = extensions.SubstraitDefaultURIPrefix + "functions_comparison.yaml" + stringURI = extensions.SubstraitDefaultURIPrefix + "functions_string.yaml" + + notID = extensions.ID{URI: boolURI, Name: "not"} + andID = extensions.ID{URI: boolURI, Name: "and"} + orID = extensions.ID{URI: boolURI, Name: "or"} + isNaNID = extensions.ID{URI: compareURI, Name: "is_nan"} + isNullID = extensions.ID{URI: compareURI, Name: "is_null"} + isNotNullID = extensions.ID{URI: compareURI, Name: "is_not_null"} + equalID = extensions.ID{URI: compareURI, Name: "equal"} + notEqualID = extensions.ID{URI: compareURI, Name: "not_equal"} + greaterEqualID = extensions.ID{URI: compareURI, Name: "gte"} + greaterID = extensions.ID{URI: compareURI, Name: "gt"} + lessEqualID = extensions.ID{URI: compareURI, Name: "lte"} + lessID = extensions.ID{URI: compareURI, Name: "lt"} + startsWithID = extensions.ID{URI: stringURI, Name: "starts_with"} + isInID = extensions.ID{URI: funcSetURI, Name: "is_in"} +) + +type toSubstraitExpr struct { + schema *iceberg.Schema + bldr expr.ExprBuilder +} + +func (t *toSubstraitExpr) VisitTrue() expr.Builder { + return t.bldr.Wrap(expr.NewLiteral(true, false)) +} + +func (t *toSubstraitExpr) VisitFalse() expr.Builder { + return t.bldr.Wrap(expr.NewLiteral(false, false)) +} + +func (t *toSubstraitExpr) VisitNot(child expr.Builder) expr.Builder { + return t.bldr.ScalarFunc(notID).Args(child.(expr.FuncArgBuilder)) +} + +func (t *toSubstraitExpr) VisitAnd(left, right expr.Builder) expr.Builder { + return t.bldr.ScalarFunc(andID).Args(left.(expr.FuncArgBuilder), + right.(expr.FuncArgBuilder)) +} + +func (t *toSubstraitExpr) VisitOr(left, right expr.Builder) expr.Builder { + return t.bldr.ScalarFunc(orID).Args(left.(expr.FuncArgBuilder), + right.(expr.FuncArgBuilder)) +} + +func (t *toSubstraitExpr) VisitUnbound(iceberg.UnboundPredicate) expr.Builder { + panic("can only convert bound expressions to substrait") +} + +func (t *toSubstraitExpr) VisitBound(pred iceberg.BoundPredicate) expr.Builder { + return iceberg.VisitBoundPredicate(pred, t) +} + +type substraitPrimitiveLiteralTypes interface { + bool | ~int32 | ~int64 | float32 | float64 | string +} + +func toPrimitiveSubstraitLiteral[T substraitPrimitiveLiteralTypes](v T) expr.Literal { + return expr.NewPrimitiveLiteral(v, false) +} + +func toByteSliceSubstraitLiteral[T []byte | types.UUID](v T) expr.Literal { + return expr.NewByteSliceLiteral(v, false) +} + +func toDecimalLiteral(v iceberg.DecimalLiteral) expr.Literal { + byts, _ := v.MarshalBinary() + result, _ := expr.NewLiteral(&types.Decimal{ + Scale: int32(v.Scale), + Value: byts, + Precision: int32(v.Type().(*iceberg.DecimalType).Precision()), + }, false) + return result +} + +func toFixedLiteral(v iceberg.FixedLiteral) expr.Literal { + return expr.NewFixedBinaryLiteral(types.FixedBinary(v), false) +} + +func toSubstraitLiteral(typ iceberg.Type, lit iceberg.Literal) expr.Literal { + switch lit := lit.(type) { + case iceberg.BoolLiteral: + return toPrimitiveSubstraitLiteral(bool(lit)) + case iceberg.Int32Literal: + return toPrimitiveSubstraitLiteral(int32(lit)) + case iceberg.Int64Literal: + return toPrimitiveSubstraitLiteral(int64(lit)) + case iceberg.Float32Literal: + return toPrimitiveSubstraitLiteral(float32(lit)) + case iceberg.Float64Literal: + return toPrimitiveSubstraitLiteral(float64(lit)) + case iceberg.StringLiteral: + return toPrimitiveSubstraitLiteral(string(lit)) + case iceberg.TimestampLiteral: + if typ.Equals(iceberg.PrimitiveTypes.TimestampTz) { + return toPrimitiveSubstraitLiteral(types.TimestampTz(lit)) + } + return toPrimitiveSubstraitLiteral(types.Timestamp(lit)) + case iceberg.DateLiteral: + return toPrimitiveSubstraitLiteral(types.Date(lit)) + case iceberg.TimeLiteral: + return toPrimitiveSubstraitLiteral(types.Time(lit)) + case iceberg.BinaryLiteral: + return toByteSliceSubstraitLiteral([]byte(lit)) + case iceberg.FixedLiteral: + return toFixedLiteral(lit) + case iceberg.UUIDLiteral: + return toByteSliceSubstraitLiteral(types.UUID(lit[:])) + case iceberg.DecimalLiteral: + return toDecimalLiteral(lit) + } + panic(fmt.Errorf("invalid literal type: %s", lit.Type())) +} + +func toSubstraitLiteralSet(typ iceberg.Type, lits []iceberg.Literal) expr.ListLiteralValue { + if len(lits) == 0 { + return nil + } + + out := make([]expr.Literal, len(lits)) + for i, l := range lits { + out[i] = toSubstraitLiteral(typ, l) + } + return out +} + +func (t *toSubstraitExpr) getRef(ref iceberg.BoundReference) expr.Reference { + updatedRef, err := iceberg.Reference(ref.Field().Name).Bind(t.schema, true) Review Comment: Do we want to pass down the case-sensitivity from the ScanOption? ########## table/arrow_scanner.go: ########## @@ -0,0 +1,597 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "io" + "iter" + "runtime" + "strconv" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute" + "github.com/apache/arrow-go/v18/arrow/compute/exprs" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/table/internal" + "github.com/apache/iceberg-go/table/substrait" + "github.com/substrait-io/substrait-go/expr" + "golang.org/x/sync/errgroup" +) + +const ( + ScanOptionArrowUseLargeTypes = "arrow.use_large_types" +) + +type positionDeletes = []*arrow.Chunked +type perFileDeletes = map[string]positionDeletes + +func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask) (perFileDeletes, error) { + var ( + deletesPerFile = make(perFileDeletes) + uniqueDeletes = make(map[string]iceberg.DataFile) + err error + ) + + for _, t := range tasks { + for _, d := range t.DeleteFiles { + if _, ok := uniqueDeletes[d.FilePath()]; !ok { + uniqueDeletes[d.FilePath()] = d + } + } + } + + if len(uniqueDeletes) == 0 { + return deletesPerFile, nil + } + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(runtime.NumCPU()) + + perFileChan := make(chan map[string]*arrow.Chunked, runtime.NumCPU()) + go func() { + defer close(perFileChan) + for _, v := range uniqueDeletes { + g.Go(func() error { + deletes, err := readDeletes(ctx, fs, v) + if deletes != nil { + perFileChan <- deletes + } + return err + }) + } + + err = g.Wait() + }() + + for deletes := range perFileChan { + for file, arr := range deletes { + deletesPerFile[file] = append(deletesPerFile[file], arr) + } + } + + return deletesPerFile, err +} + +func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile) (map[string]*arrow.Chunked, error) { + src, err := internal.GetFile(ctx, fs, dataFile, true) + if err != nil { + return nil, err + } + + rdr, err := src.GetReader(ctx) + if err != nil { + return nil, err + } + defer rdr.Close() + + tbl, err := rdr.ReadTable(ctx) + if err != nil { + return nil, err + } + defer tbl.Release() + + tbl, err = array.UnifyTableDicts(compute.GetAllocator(ctx), tbl) + if err != nil { + return nil, err + } + defer tbl.Release() + + filePathCol := tbl.Column(tbl.Schema().FieldIndices("file_path")[0]).Data() + posCol := tbl.Column(tbl.Schema().FieldIndices("pos")[0]).Data() Review Comment: If you want to go full Iceberg, then you want to look up these columns by FieldID as well :) ########## table/internal/parquet_files.go: ########## @@ -0,0 +1,421 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "context" + "fmt" + "slices" + "strconv" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/file" + "github.com/apache/arrow-go/v18/parquet/metadata" + "github.com/apache/arrow-go/v18/parquet/pqarrow" + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" +) + +type ParquetFileSource struct { + mem memory.Allocator + fs iceio.IO + file iceberg.DataFile + isPosDeletes bool Review Comment: This information can also be deducted from the `DataFile`, where we have `134: content` ########## README.md: ########## @@ -77,9 +77,10 @@ $ cd iceberg-go/cmd/iceberg && go build . ### Read/Write Data Support -* No intrinsic support for reading/writing data yet -* Data can be manually read currently by retrieving data files via Manifests. -* Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow/go/v14@v14.0.0) support eventually. +* No intrinsic support for writing data yet. +* Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow-go/) support eventually. +* Data can currently be read as an Arrow Table or as a stream of Arrow +record batches. Review Comment: ```suggestion * Data can currently be read as an Arrow Table or as a stream of Arrow record batches. ``` ########## table/scanner.go: ########## @@ -393,3 +398,62 @@ type FileScanTask struct { DeleteFiles []iceberg.DataFile Start, Length int64 } + +// ToArrowRecords returns the arrow schema of the expected records and an interator +// that can be used with a range expression to read the records as they are available. +// If an error is encountered, during the planning and setup then this will return the +// error directly. If the error occurs while iterating the records, it will be returned +// by the iterator. +// +// The purpose for returning the schema up front is to handle the case where there are no +// rows returned. The resulting Arrow Schema of the projection will still be known. +func (s *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, iter.Seq2[arrow.Record, error], error) { Review Comment: How about slightly longer variable names? :) ```suggestion func (scan *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, iter.Seq2[arrow.Record, error], error) { ``` ########## table/substrait/functions_set.yaml: ########## @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +%YAML 1.2 +--- +scalar_functions: + - + name: "is_in" + description: > + Checks membership of a value in a list of values + + Returns true or false if `needle` is found in `haystack`. + impls: + - args: + - name: needle + value: any1 + - name: haystack + value: list<any1> + options: + nan_equality: + values: [ NAN_IS_NAN, NAN_IS_NOT_NAN ] Review Comment: Just sharing an idea. We could also rewrite the whole thing to just equality expressions :) ########## table/evaluators.go: ########## @@ -690,6 +706,46 @@ type inclusiveMetricsEval struct { includeEmptyFiles bool } +func (m *inclusiveMetricsEval) TestRowGroup(rgmeta *metadata.RowGroupMetaData, colIndices []int) (bool, error) { Review Comment: Nice one! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org