stevenzwu commented on code in PR #6222: URL: https://github.com/apache/iceberg/pull/6222#discussion_r1040393766
########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java: ########## @@ -295,6 +299,161 @@ private static void assertEquals( } } + public static void assertEqualsSafe( + Schema schema, List<GenericData.Record> recs, List<Row> rows) { Review Comment: why do we need the helper methods with Avro generic record? ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); + if (precision == 6) { + int nanosOfMillisecond = (int) (timeLong % 1000); + return TimestampData.fromEpochMillis(timeLong / 1000, nanosOfMillisecond); + } else { + return TimestampData.fromEpochMillis(timeLong); + } + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + return RawValueData.fromBytes(getBinary(pos)); + } + + @Override + public byte[] getBinary(int pos) { + return isNullAt(pos) ? null : getBinaryInternal(pos); + } + + private byte[] getBinaryInternal(int pos) { + Object bytes = struct.get(pos, Object.class); + + // should only be either ByteBuffer or byte[] + if (bytes instanceof ByteBuffer) { + return ByteBuffers.toByteArray((ByteBuffer) bytes); + } else if (bytes instanceof byte[]) { + return (byte[]) bytes; + } else { + throw new IllegalStateException( + "Unknown type for binary field. Type name: " + bytes.getClass().getName()); + } + } + + @Override + public ArrayData getArray(int pos) { + return isNullAt(pos) ? null : getArrayInternal(pos); + } + + private ArrayData getArrayInternal(int pos) { + return collectionToArrayData( + type.fields().get(pos).type().asListType().elementType(), + struct.get(pos, Collection.class)); + } + + @Override + public MapData getMap(int pos) { + return isNullAt(pos) ? null : getMapInternal(pos); + } + + private MapData getMapInternal(int pos) { + return mapToMapData(type.fields().get(pos).type().asMapType(), struct.get(pos, Map.class)); + } + + @Override + public RowData getRow(int pos, int numFields) { + return isNullAt(pos) ? null : getStructRowData(pos, numFields); + } + + private StructRowData getStructRowData(int ordinal, int numFields) { + return new StructRowData( + type.fields().get(ordinal).type().asStructType(), struct.get(ordinal, StructLike.class)); + } + + private MapData mapToMapData(Types.MapType mapType, Map<?, ?> map) { + // make a defensive copy to ensure entries do not change + List<Map.Entry<?, ?>> entries = ImmutableList.copyOf(map.entrySet()); + + ArrayData keyArray = Review Comment: ok. I got it that we need run the conversion recursively (e.g. for StringData key or RowData value). I like to propose a diff structure for this class. We can repurpose `collectionToArrayData` to a `convertFromStructToRowData(Type type, Object value)` method. If `type` is most primitive, it should be simple pass-thru. for other types (like timestamp, struct, map, array) there are some conversions to Flink types (like TimestampData, RowData, MapData, ArrayData). Then for the map, we don't have to convert to key and value array. I believe the reason you did it is to reuse the `collectionToArrayData`. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; + +public class StructRowData implements RowData { + private final Types.StructType type; + private RowKind kind; + private StructLike struct; + + public StructRowData(Types.StructType type) { + this(type, RowKind.INSERT); + } + + public StructRowData(Types.StructType type, RowKind kind) { + this(type, null, kind); + } + + private StructRowData(Types.StructType type, StructLike struct) { + this(type, struct, RowKind.INSERT); + } + + private StructRowData(Types.StructType type, StructLike struct, RowKind kind) { + this.type = type; + this.struct = struct; + this.kind = kind; + } + + public StructRowData setStruct(StructLike newStruct) { + this.struct = newStruct; + return this; + } + + @Override + public int getArity() { + return struct.size(); + } + + @Override + public RowKind getRowKind() { + return kind; + } + + @Override + public void setRowKind(RowKind newKind) { + Preconditions.checkNotNull(newKind, "kind can not be null"); + this.kind = newKind; + } + + @Override + public boolean isNullAt(int pos) { + return struct.get(pos, Object.class) == null; + } + + @Override + public boolean getBoolean(int pos) { + return struct.get(pos, Boolean.class); + } + + @Override + public byte getByte(int pos) { + return (byte) (int) struct.get(pos, Integer.class); + } + + @Override + public short getShort(int pos) { + return (short) (int) struct.get(pos, Integer.class); + } + + @Override + public int getInt(int pos) { + Object integer = struct.get(pos, Object.class); + + if (integer instanceof Integer) { + return (int) integer; + } else if (integer instanceof LocalDate) { + return (int) ((LocalDate) integer).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for int field. Type name: " + integer.getClass().getName()); + } + } + + @Override + public long getLong(int pos) { + Object longVal = struct.get(pos, Object.class); + + if (longVal instanceof Long) { + return (long) longVal; + } else if (longVal instanceof OffsetDateTime) { + return Duration.between(Instant.EPOCH, (OffsetDateTime) longVal).toNanos() / 1000; + } else if (longVal instanceof LocalDate) { + return ((LocalDate) longVal).toEpochDay(); + } else { + throw new IllegalStateException( + "Unknown type for long field. Type name: " + longVal.getClass().getName()); + } + } + + @Override + public float getFloat(int pos) { + return struct.get(pos, Float.class); + } + + @Override + public double getDouble(int pos) { + return struct.get(pos, Double.class); + } + + @Override + public StringData getString(int pos) { + return isNullAt(pos) ? null : getStringDataInternal(pos); + } + + private StringData getStringDataInternal(int pos) { + CharSequence seq = struct.get(pos, CharSequence.class); + return StringData.fromString(seq.toString()); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return isNullAt(pos) + ? null + : DecimalData.fromBigDecimal(getDecimalInternal(pos), precision, scale); + } + + private BigDecimal getDecimalInternal(int pos) { + return struct.get(pos, BigDecimal.class); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + long timeLong = getLong(pos); Review Comment: I think we can simply call this `Timestamp` constructor ``` private TimestampData(long millisecond, int nanoOfMillisecond) { Preconditions.checkArgument(nanoOfMillisecond >= 0 && nanoOfMillisecond <= 999_999); this.millisecond = millisecond; this.nanoOfMillisecond = nanoOfMillisecond; } ``` We can ignore the `precision` arg like `ArrowTimestampColumnVector` from Flink. ``` @Override public TimestampData getTimestamp(int i, int precision) { if (valueVector instanceof TimeStampSecVector) { return TimestampData.fromEpochMillis(((TimeStampSecVector) valueVector).get(i) * 1000); } else if (valueVector instanceof TimeStampMilliVector) { return TimestampData.fromEpochMillis(((TimeStampMilliVector) valueVector).get(i)); } else if (valueVector instanceof TimeStampMicroVector) { long micros = ((TimeStampMicroVector) valueVector).get(i); return TimestampData.fromEpochMillis(micros / 1000, (int) (micros % 1000) * 1000); } else { long nanos = ((TimeStampNanoVector) valueVector).get(i); return TimestampData.fromEpochMillis(nanos / 1_000_000, (int) (nanos % 1_000_000)); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org