aihuaxu commented on code in PR #12323: URL: https://github.com/apache/iceberg/pull/12323#discussion_r1972563111
########## core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class VariantVisitor<R> { + public R object(VariantObject object, List<R> fieldResults) { Review Comment: Should we add `List<String> names` as the parameter like some other visitors `AvroSchemaVisitor` so we don't rely on the order of fieldResults or we can ignore nulls? ########## parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java: ########## @@ -157,6 +171,16 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) { throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + + @Override + public ParquetValueWriter<?> variant(Types.VariantType iVariant, ParquetValueWriter<?> result) { + return result; + } + + @Override + public ParquetVariantVisitor<ParquetValueWriter<?>> variantVisitor() { Review Comment: We can reduce the scope to protected for now. It doesn't seem to be accessed outside. ########## core/src/main/java/org/apache/iceberg/variants/VariantVisitor.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; Review Comment: I think the same as variant reader PR, we are not including array, right? ########## parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java: ########## @@ -56,6 +58,19 @@ public class TypeToMessageType { LogicalTypeAnnotation.timestampType(false /* not adjusted to UTC */, TimeUnit.MICROS); private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS = LogicalTypeAnnotation.timestampType(true /* adjusted to UTC */, TimeUnit.MICROS); + private static final String METADATA = "metadata"; Review Comment: Right now we are defining the constants in multiple places. We don't want to be in API but does it make sense in core Variants.java? ########## parquet/src/main/java/org/apache/iceberg/parquet/VariantWriterBuilder.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.util.Deque; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.VariantValue; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public class VariantWriterBuilder extends ParquetVariantVisitor<ParquetValueWriter<?>> { + private final MessageType schema; + private final Iterable<String> basePath; + private final Deque<String> fieldNames = Lists.newLinkedList(); + + public VariantWriterBuilder(MessageType schema, Iterable<String> basePath) { + this.schema = schema; + this.basePath = basePath; + } + + @Override + public void beforeField(Type type) { + fieldNames.addLast(type.getName()); + } + + @Override + public void afterField(Type type) { + fieldNames.removeLast(); + } + + private String[] currentPath() { + return Streams.concat(Streams.stream(basePath), fieldNames.stream()).toArray(String[]::new); + } + + private String[] path(String... names) { + return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(names)) + .toArray(String[]::new); + } + + @Override + public ParquetValueWriter<?> variant( + GroupType variant, ParquetValueWriter<?> metadataWriter, ParquetValueWriter<?> valueWriter) { + return ParquetVariantWriters.variant(metadataWriter, valueWriter); + } + + @Override + public ParquetValueWriter<?> metadata(PrimitiveType metadata) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + return ParquetVariantWriters.metadata(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public ParquetValueWriter<?> serialized(PrimitiveType value) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + return ParquetVariantWriters.value(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public ParquetValueWriter<?> primitive(PrimitiveType primitive) { + ColumnDescriptor desc = schema.getColumnDescription(currentPath()); + LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation(); + if (annotation != null) { + Optional<ParquetValueWriter<?>> writer = + annotation.accept(new LogicalTypeToVariantWriter(desc)); + if (writer.isPresent()) { + return writer.get(); + } + + } else { + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + return ParquetVariantWriters.primitive( + ParquetValueWriters.byteBuffers(desc), PhysicalType.BINARY); + case BOOLEAN: + return ParquetVariantWriters.primitive( + ParquetValueWriters.booleans(desc), + PhysicalType.BOOLEAN_TRUE, + PhysicalType.BOOLEAN_FALSE); + case INT32: + return ParquetVariantWriters.primitive( + ParquetValueWriters.ints(desc), PhysicalType.INT32); + case INT64: + return ParquetVariantWriters.primitive( + ParquetValueWriters.longs(desc), PhysicalType.INT64); + case FLOAT: + // use an unboxed writer to skip metrics collection that requires an ID + return ParquetVariantWriters.primitive( + ParquetValueWriters.unboxed(desc), PhysicalType.FLOAT); + case DOUBLE: + // use an unboxed writer to skip metrics collection that requires an ID + return ParquetVariantWriters.primitive( + ParquetValueWriters.unboxed(desc), PhysicalType.DOUBLE); + } + } + + throw new UnsupportedOperationException("Unsupported shredded value type: " + primitive); + } + + @Override + public ParquetValueWriter<?> value( + GroupType value, ParquetValueWriter<?> valueWriter, ParquetValueWriter<?> typedWriter) { + int valueDL = schema.getMaxDefinitionLevel(path(VALUE)); Review Comment: Rename this to valueDefinitionLevel to be consistent. ########## parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantWriters.java: ########## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.column.ColumnWriteStore; + +class ParquetVariantWriters { + private ParquetVariantWriters() {} + + @SuppressWarnings("unchecked") + static ParquetValueWriter<Variant> variant( + ParquetValueWriter<?> metadataWriter, ParquetValueWriter<?> valueWriter) { + return new VariantWriter( + (ParquetValueWriter<VariantMetadata>) metadataWriter, + (ParquetValueWriter<VariantValue>) valueWriter); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter<VariantMetadata> metadata(ParquetValueWriter<?> bytesWriter) { + return new VariantMetadataWriter((ParquetValueWriter<ByteBuffer>) bytesWriter); + } + + @SuppressWarnings("unchecked") + static ParquetValueWriter<VariantValue> value(ParquetValueWriter<?> bytesWriter) { + return new VariantValueWriter((ParquetValueWriter<ByteBuffer>) bytesWriter); + } + + static ParquetValueWriter<VariantValue> primitive( + ParquetValueWriter<?> writer, PhysicalType... types) { Review Comment: We accept varargs because of BOOLEAN_TRUE or BOOLEAN_FALSE and we don't expect multiple types? Can we add a check here to make sure only one type is passed other than boolean? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org