stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1043944439
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { Review Comment: this is not actually converting from struct to RowData. It is `convertingIcebergValueToFlinkRowDataValue` or maybe simpler form as `convertValue` ########## api/src/main/java/org/apache/iceberg/types/ReassignDoc.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.types; + +import java.util.List; +import java.util.function.Supplier; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class ReassignDoc extends TypeUtil.CustomOrderSchemaVisitor<Type> { + private final Schema docSourceSchema; + + public ReassignDoc(Schema docSourceSchema) { + this.docSourceSchema = docSourceSchema; + } + + @Override + public Type schema(Schema schema, Supplier<Type> future) { + return future.get(); + } + + @Override + public Type struct(Types.StructType struct, Iterable<Type> fieldTypes) { + List<Types.NestedField> fields = struct.fields(); + int length = fields.size(); + + List<Type> types = Lists.newArrayList(fieldTypes); + List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(length); + for (int i = 0; i < length; i += 1) { + Types.NestedField field = fields.get(i); + int fieldId = field.fieldId(); + Types.NestedField docField = docSourceSchema.findField(fieldId); + + if (docField == null) { Review Comment: I think Iceberg coding style is to use `Preconditions.check` for cases like this. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); Review Comment: nit: `Long` to `long` to avoid extra boxing Might be a little easier to read if it is broken down. ``` // Iceberg timestamp field has micro second precision long millisecond = (long) value / 1000; int nanoOfMillisecond = (int) ((Long) value % 1000) * 1000; return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond); ``` ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ########## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( Review Comment: what does `Safe` mean here? should they just be called `assertEquals`? nit: Iceberg coding style probably would name `recs` as full names `records` ########## api/src/main/java/org/apache/iceberg/types/TypeUtil.java: ########## @@ -299,6 +299,28 @@ public static Schema reassignIds(Schema schema, Schema idSourceSchema) { return reassignIds(schema, idSourceSchema, true); } + /** + * Reassigns doc in a schema from another schema. + * + * <p>Doc are determined by field id. If a field in the schema cannot be found in the source + * schema, this will throw IllegalArgumentException. + * + * <p>This will not alter a schema's structure, nullability, or types. + * + * @param schema the schema to have doc reassigned + * @param docSourceSchema the schema from which field doc will be used + * @return an structurally identical schema with field ids matching the source schema + * @throws IllegalArgumentException if a field cannot be found (by id) in the source schema + */ + public static Schema reassignDoc(Schema schema, Schema docSourceSchema) { Review Comment: like to get more feedback for adding a new public method in the api module. cc @rdblue @aokolnychyi @RussellSpitzer @szehon-ho Here is the context why we need this. The converter in `FlinkSchemaUtil` creates Iceberg `Schema` without doc attributes. Hence @hililiwei implemented the util class and method to carry over the doc attribute from `docSourceSchema` to `schema` (converted from Flink `TableSchema`). ``` public static Schema convert(TableSchema schema) { } ``` I suggested moving it into `TypeUtil` class in api module as it is very similar to the `reassignIds` method in the `TypeUtil` class. ########## api/src/main/java/org/apache/iceberg/types/ReassignDoc.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.types; + +import java.util.List; +import java.util.function.Supplier; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class ReassignDoc extends TypeUtil.CustomOrderSchemaVisitor<Type> { Review Comment: this doesn't need to be public ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ########## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( + Schema schema, List<GenericData.Record> recs, List<Row> rows) { + Streams.forEachPair( + recs.stream(), rows.stream(), (rec, row) -> assertEqualsSafe(schema, rec, row)); + } + + public static void assertEqualsSafe(Schema schema, GenericData.Record rec, Row row) { + List<Types.NestedField> fields = schema.asStruct().fields(); + RowType rowType = FlinkSchemaUtil.convert(schema); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + Object expectedValue = rec.get(i); + Object actualValue = row.getField(i); + LogicalType logicalType = rowType.getTypeAt(i); + assertAvroEquals(fieldType, logicalType, expectedValue, actualValue); + } + } + + private static void assertEqualsSafe(Types.StructType struct, GenericData.Record rec, Row row) { + List<Types.NestedField> fields = struct.fields(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + Object expectedValue = rec.get(i); + Object actualValue = row.getField(i); + assertAvroEquals(fieldType, null, expectedValue, actualValue); + } + } + + private static void assertAvroEquals( + Type type, LogicalType logicalType, Object expected, Object actual) { + + if (expected == null && actual == null) { + return; + } + + Assert.assertTrue( + "expected and actual should be both null or not null", expected != null && actual != null); + + switch (type.typeId()) { + case BOOLEAN: Review Comment: nit: seems that some primitive types can be collapsed without repetitions ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); + case STRING: + return StringData.fromString(value.toString()); + case FIXED: + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + return new StructRowData(elementType.asStructType(), (StructLike) value); + case LIST: + List<?> list = (List<?>) value; + Object[] array = new Object[list.size()]; Review Comment: technically, since we know the list element type, we can build the accurate array as `T[]`, if the list element type is `T` ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); + case STRING: + return StringData.fromString(value.toString()); + case FIXED: + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + return new StructRowData(elementType.asStructType(), (StructLike) value); + case LIST: + List<?> list = (List<?>) value; + Object[] array = new Object[list.size()]; Review Comment: Java arrays are covariant. Hence `Object[]` can hold references to any type. But I am wondering if it will cause problem for `equals` e.g. comparing btw `Object[]` and `Integer[]`. we will see once we write up the unit test (in a separate PR). ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); + case STRING: + return StringData.fromString(value.toString()); + case FIXED: + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + return new StructRowData(elementType.asStructType(), (StructLike) value); + case LIST: + List<?> list = (List<?>) value; + Object[] array = new Object[list.size()]; + + int index = 0; + for (Object element : list) { + if (element == null) { + array[index] = null; + } else { + array[index] = + covertFromStructToRowData(elementType.asListType().elementType(), element); + } + + index += 1; + } + return new GenericArrayData(array); + case MAP: + Types.MapType mapType = elementType.asMapType(); + // make a defensive copy to ensure entries do not change Review Comment: is this necessary? if we need defensive copy for map, we should do it for every fields, right? ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); + case STRING: + return StringData.fromString(value.toString()); + case FIXED: + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + return new StructRowData(elementType.asStructType(), (StructLike) value); + case LIST: + List<?> list = (List<?>) value; + Object[] array = new Object[list.size()]; + + int index = 0; + for (Object element : list) { + if (element == null) { + array[index] = null; + } else { + array[index] = + covertFromStructToRowData(elementType.asListType().elementType(), element); + } + + index += 1; + } + return new GenericArrayData(array); + case MAP: + Types.MapType mapType = elementType.asMapType(); + // make a defensive copy to ensure entries do not change + List<Map.Entry<?, ?>> entries = ImmutableList.copyOf(((Map<?, ?>) value).entrySet()); + + Map<Object, Object> result = Maps.newHashMap(); + for (Map.Entry<?, ?> entry : entries) { + final Object keyValue = covertFromStructToRowData(mapType.keyType(), entry.getKey()); + final Object valueValue = + covertFromStructToRowData(mapType.valueType(), entry.getValue()); + result.put(keyValue, valueValue); + } + return new GenericMapData(result); + default: + throw new UnsupportedOperationException("Unsupported array element type: " + elementType); Review Comment: is the error msg `array element type` correct? ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); + case STRING: + return StringData.fromString(value.toString()); + case FIXED: + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + return new StructRowData(elementType.asStructType(), (StructLike) value); + case LIST: + List<?> list = (List<?>) value; + Object[] array = new Object[list.size()]; + Review Comment: nit: empty line not need here ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); + case STRING: + return StringData.fromString(value.toString()); + case FIXED: + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + return new StructRowData(elementType.asStructType(), (StructLike) value); + case LIST: + List<?> list = (List<?>) value; + Object[] array = new Object[list.size()]; + + int index = 0; + for (Object element : list) { + if (element == null) { + array[index] = null; + } else { + array[index] = + covertFromStructToRowData(elementType.asListType().elementType(), element); + } + + index += 1; + } + return new GenericArrayData(array); + case MAP: + Types.MapType mapType = elementType.asMapType(); + // make a defensive copy to ensure entries do not change + List<Map.Entry<?, ?>> entries = ImmutableList.copyOf(((Map<?, ?>) value).entrySet()); + Review Comment: nit: no empty line needed here. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); + case STRING: + return StringData.fromString(value.toString()); + case FIXED: + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + return new StructRowData(elementType.asStructType(), (StructLike) value); + case LIST: + List<?> list = (List<?>) value; + Object[] array = new Object[list.size()]; + + int index = 0; + for (Object element : list) { + if (element == null) { + array[index] = null; + } else { + array[index] = + covertFromStructToRowData(elementType.asListType().elementType(), element); + } + + index += 1; + } Review Comment: nit: Iceberg coding style requires empty line after control block. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +@Internal +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) + ? null + : (ArrayData) + covertFromStructToRowData( + type.fields().get(pos).type().asListType(), struct.get(pos, List.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) + ? null + : (MapData) + covertFromStructToRowData( + type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int pos, int numFields) { + return new StructRowData( + type.fields().get(pos).type().asStructType(), struct.get(pos, StructLike.class)); + } + + private Object covertFromStructToRowData(Type elementType, Object value) { + switch (elementType.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case TIME: + case LONG: + case FLOAT: + case DOUBLE: + case DECIMAL: + return value; + case TIMESTAMP: + return TimestampData.fromEpochMillis( + (Long) value / 1000, (int) ((Long) value % 1000) * 1000); + case STRING: + return StringData.fromString(value.toString()); + case FIXED: + case BINARY: + return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + return new StructRowData(elementType.asStructType(), (StructLike) value); + case LIST: + List<?> list = (List<?>) value; + Object[] array = new Object[list.size()]; + + int index = 0; + for (Object element : list) { + if (element == null) { + array[index] = null; + } else { + array[index] = + covertFromStructToRowData(elementType.asListType().elementType(), element); + } + + index += 1; + } + return new GenericArrayData(array); + case MAP: + Types.MapType mapType = elementType.asMapType(); + // make a defensive copy to ensure entries do not change + List<Map.Entry<?, ?>> entries = ImmutableList.copyOf(((Map<?, ?>) value).entrySet()); + + Map<Object, Object> result = Maps.newHashMap(); + for (Map.Entry<?, ?> entry : entries) { + final Object keyValue = covertFromStructToRowData(mapType.keyType(), entry.getKey()); + final Object valueValue = + covertFromStructToRowData(mapType.valueType(), entry.getValue()); + result.put(keyValue, valueValue); + } Review Comment: nit: iceberg coding style needs an empty line here (after control block). ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ########## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( + Schema schema, List<GenericData.Record> recs, List<Row> rows) { + Streams.forEachPair( + recs.stream(), rows.stream(), (rec, row) -> assertEqualsSafe(schema, rec, row)); + } + + public static void assertEqualsSafe(Schema schema, GenericData.Record rec, Row row) { + List<Types.NestedField> fields = schema.asStruct().fields(); + RowType rowType = FlinkSchemaUtil.convert(schema); + for (int i = 0; i < fields.size(); i += 1) { Review Comment: nit: use `++i`? ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ########## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( + Schema schema, List<GenericData.Record> recs, List<Row> rows) { + Streams.forEachPair( + recs.stream(), rows.stream(), (rec, row) -> assertEqualsSafe(schema, rec, row)); + } + + public static void assertEqualsSafe(Schema schema, GenericData.Record rec, Row row) { + List<Types.NestedField> fields = schema.asStruct().fields(); Review Comment: maybe first do an equals assertion on the size for the record and row ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java: ########## @@ -0,0 +1,836 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.time.Instant; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.commons.collections.ListUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.MetricsUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.SnapshotUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkMetaDataTable extends FlinkCatalogTestBase { + private static final String TABLE_NAME = "test_table"; + private final FileFormat format = FileFormat.AVRO; + private static final TemporaryFolder TEMP = new TemporaryFolder(); + private final boolean isPartition; + + public TestFlinkMetaDataTable(String catalogName, Namespace baseNamespace, Boolean isPartition) { + super(catalogName, baseNamespace); + this.isPartition = isPartition; + } + + @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, isPartition={2}") + public static Iterable<Object[]> parameters() { + List<Object[]> parameters = Lists.newArrayList(); + + for (Boolean isPartition : new Boolean[] {true, false}) { + String catalogName = "testhadoop"; + Namespace baseNamespace = Namespace.of("default"); + parameters.add(new Object[] {catalogName, baseNamespace, isPartition}); + } + return parameters; + } + + @Override + protected TableEnvironment getTableEnv() { + Configuration configuration = super.getTableEnv().getConfig().getConfiguration(); + configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1); + return super.getTableEnv(); + } + + @Before + public void before() { + super.before(); + sql("USE CATALOG %s", catalogName); + sql("CREATE DATABASE %s", flinkDatabase); + sql("USE %s", DATABASE); + if (isPartition) { + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) PARTITIONED BY (data) WITH ('format-version'='2', 'write.format.default'='%s')", + TABLE_NAME, format.name()); + Review Comment: nit: no need for empty line ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ########## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( + Schema schema, List<GenericData.Record> recs, List<Row> rows) { + Streams.forEachPair( + recs.stream(), rows.stream(), (rec, row) -> assertEqualsSafe(schema, rec, row)); + } + + public static void assertEqualsSafe(Schema schema, GenericData.Record rec, Row row) { + List<Types.NestedField> fields = schema.asStruct().fields(); + RowType rowType = FlinkSchemaUtil.convert(schema); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + Object expectedValue = rec.get(i); + Object actualValue = row.getField(i); + LogicalType logicalType = rowType.getTypeAt(i); + assertAvroEquals(fieldType, logicalType, expectedValue, actualValue); + } + } + + private static void assertEqualsSafe(Types.StructType struct, GenericData.Record rec, Row row) { + List<Types.NestedField> fields = struct.fields(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + Object expectedValue = rec.get(i); + Object actualValue = row.getField(i); + assertAvroEquals(fieldType, null, expectedValue, actualValue); + } + } + + private static void assertAvroEquals( + Type type, LogicalType logicalType, Object expected, Object actual) { + + if (expected == null && actual == null) { + return; + } + + Assert.assertTrue( + "expected and actual should be both null or not null", expected != null && actual != null); + + switch (type.typeId()) { + case BOOLEAN: + Assert.assertEquals("boolean value should be equal", expected, actual); + break; + case INTEGER: + Assert.assertEquals("int value should be equal", expected, actual); + break; + case LONG: + Assert.assertEquals("long value should be equal", expected, actual); + break; + case FLOAT: + Assert.assertEquals("float value should be equal", expected, actual); + break; + case DOUBLE: + Assert.assertEquals("double value should be equal", expected, actual); + break; + case STRING: + Assertions.assertThat(expected) + .as("Should expect a CharSequence") + .isInstanceOf(CharSequence.class); + Assert.assertEquals("string should be equal", String.valueOf(expected), actual.toString()); + break; + case DATE: + Assertions.assertThat(expected).as("Should expect a Date").isInstanceOf(LocalDate.class); Review Comment: @hililiwei not sure how we are going to test the `assertEquals` change in this PR. For the test data generator PR #6377 , the internal version also has Avro GenericRecord support. I removed it in PR #6377 since I thought upstream doesn't need Avro Record. Should I add Avro GenericRecord to PR #6377 ? ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ########## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( + Schema schema, List<GenericData.Record> recs, List<Row> rows) { + Streams.forEachPair( + recs.stream(), rows.stream(), (rec, row) -> assertEqualsSafe(schema, rec, row)); + } + + public static void assertEqualsSafe(Schema schema, GenericData.Record rec, Row row) { + List<Types.NestedField> fields = schema.asStruct().fields(); + RowType rowType = FlinkSchemaUtil.convert(schema); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + Object expectedValue = rec.get(i); + Object actualValue = row.getField(i); + LogicalType logicalType = rowType.getTypeAt(i); + assertAvroEquals(fieldType, logicalType, expectedValue, actualValue); + } + } + + private static void assertEqualsSafe(Types.StructType struct, GenericData.Record rec, Row row) { + List<Types.NestedField> fields = struct.fields(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + Object expectedValue = rec.get(i); + Object actualValue = row.getField(i); + assertAvroEquals(fieldType, null, expectedValue, actualValue); + } + } + + private static void assertAvroEquals( + Type type, LogicalType logicalType, Object expected, Object actual) { + + if (expected == null && actual == null) { + return; + } + + Assert.assertTrue( + "expected and actual should be both null or not null", expected != null && actual != null); + + switch (type.typeId()) { + case BOOLEAN: + Assert.assertEquals("boolean value should be equal", expected, actual); + break; + case INTEGER: + Assert.assertEquals("int value should be equal", expected, actual); + break; + case LONG: + Assert.assertEquals("long value should be equal", expected, actual); + break; + case FLOAT: + Assert.assertEquals("float value should be equal", expected, actual); + break; + case DOUBLE: + Assert.assertEquals("double value should be equal", expected, actual); + break; + case STRING: + Assertions.assertThat(expected) + .as("Should expect a CharSequence") + .isInstanceOf(CharSequence.class); + Assert.assertEquals("string should be equal", String.valueOf(expected), actual.toString()); Review Comment: nit: `String.valueOf(expected)` -> `expected.toString()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org