liurenjie1024 commented on code in PR #12774: URL: https://github.com/apache/iceberg/pull/12774#discussion_r2045976962
########## data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.AppenderBuilder; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.ObjectModel; +import org.apache.iceberg.io.ReadBuilder; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which provides the available {@link ReadBuilder}s and writer builders ({@link + * org.apache.iceberg.data.AppenderBuilder}, {@link DataWriterBuilder}, {@link + * EqualityDeleteWriterBuilder}, {@link PositionDeleteWriterBuilder}). Based on the `file format` + * and the requested `object model name` the registry returns the correct reader and writer + * builders. These builders could be used to generate the readers and writers. + * + * <p>The available {@link ObjectModel}s are registered by the {@link + * #registerObjectModel(ObjectModel)} method. These {@link ObjectModel}s will be used to create the + * {@link ReadBuilder}s and the {@link AppenderBuilder}s. The former ones are returned directly, the + * later ones are wrapped in the appropriate writer builder implementations. + */ +public final class ObjectModelRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ObjectModelRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of(); + + private static final Map<Key, ObjectModel<?>> OBJECT_MODELS = Maps.newConcurrentMap(); + + /** + * Registers a new object model. + * + * @param objectModel the object model + * @throws IllegalArgumentException if an object model for the given {@code format} and {@code + * objectModelName} combination already exists + */ + @SuppressWarnings("CatchBlockLogException") + public static void registerObjectModel(ObjectModel<?> objectModel) { Review Comment: This implementation seems odd to me, it throws an exception to avoid duplicated registeration. How about just like this: ``` val ret = OBJECT_MODELS.putIfAbsent(k, v); if (ret != null) { log.info(...) } ``` ########## data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.AppenderBuilder; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.ObjectModel; +import org.apache.iceberg.io.ReadBuilder; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which provides the available {@link ReadBuilder}s and writer builders ({@link + * org.apache.iceberg.data.AppenderBuilder}, {@link DataWriterBuilder}, {@link + * EqualityDeleteWriterBuilder}, {@link PositionDeleteWriterBuilder}). Based on the `file format` + * and the requested `object model name` the registry returns the correct reader and writer + * builders. These builders could be used to generate the readers and writers. + * + * <p>The available {@link ObjectModel}s are registered by the {@link + * #registerObjectModel(ObjectModel)} method. These {@link ObjectModel}s will be used to create the + * {@link ReadBuilder}s and the {@link AppenderBuilder}s. The former ones are returned directly, the + * later ones are wrapped in the appropriate writer builder implementations. + */ +public final class ObjectModelRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ObjectModelRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of(); + + private static final Map<Key, ObjectModel<?>> OBJECT_MODELS = Maps.newConcurrentMap(); Review Comment: ```suggestion private static final ConcurrentMap<Key, ObjectModel<?>> OBJECT_MODELS = Maps.newConcurrentMap(); ``` If we want to allow registering non predefined formats at runtime, it should be a concurrrent map? ########## data/src/main/java/org/apache/iceberg/data/PositionDeleteWriterBuilder.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.PositionDeleteWriter; + +/** + * Builder for generating an {@link PositionDeleteWriter}. + * + * @param <B> type of the builder + * @param <E> engine specific schema of the input records used for appender initialization + */ +public interface PositionDeleteWriterBuilder<B extends PositionDeleteWriterBuilder<B, E>, E> + extends FileWriterBuilderBase<B, E> { + /** Sets the row schema for the delete writers. */ + B withRowSchema(Schema newSchema); + Review Comment: Position deletes's input maybe sorted, should we also have a config key for the case? ########## data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.AppenderBuilder; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.ObjectModel; +import org.apache.iceberg.io.ReadBuilder; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which provides the available {@link ReadBuilder}s and writer builders ({@link + * org.apache.iceberg.data.AppenderBuilder}, {@link DataWriterBuilder}, {@link + * EqualityDeleteWriterBuilder}, {@link PositionDeleteWriterBuilder}). Based on the `file format` + * and the requested `object model name` the registry returns the correct reader and writer + * builders. These builders could be used to generate the readers and writers. + * + * <p>The available {@link ObjectModel}s are registered by the {@link + * #registerObjectModel(ObjectModel)} method. These {@link ObjectModel}s will be used to create the + * {@link ReadBuilder}s and the {@link AppenderBuilder}s. The former ones are returned directly, the + * later ones are wrapped in the appropriate writer builder implementations. + */ +public final class ObjectModelRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ObjectModelRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of(); + + private static final Map<Key, ObjectModel<?>> OBJECT_MODELS = Maps.newConcurrentMap(); + + /** + * Registers a new object model. + * + * @param objectModel the object model + * @throws IllegalArgumentException if an object model for the given {@code format} and {@code + * objectModelName} combination already exists + */ + @SuppressWarnings("CatchBlockLogException") + public static void registerObjectModel(ObjectModel<?> objectModel) { + try { + Key key = new Key(objectModel.format(), objectModel.name()); + if (OBJECT_MODELS.containsKey(key)) { + throw new IllegalArgumentException( + String.format( + "Object model %s clashes with %s. Both serves %s", + objectModel.getClass(), OBJECT_MODELS.get(key), key)); + } + + OBJECT_MODELS.put(key, objectModel); + } catch (RuntimeException e) { + // failing to register an object model is normal and does not require a stack trace + LOG.info( + "Unable to use register object model {} for data files: {}", objectModel, e.getMessage()); Review Comment: ```suggestion "Unable to register object model {} for data files: {}", objectModel, e.getMessage()); ``` ########## data/src/main/java/org/apache/iceberg/data/AppenderBuilder.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; + +/** + * Builder for generating a {@link FileAppender}. + * + * @param <B> type of the builder + * @param <E> engine specific schema of the input records used for appender initialization + */ +public interface AppenderBuilder<B extends AppenderBuilder<B, E>, E> Review Comment: Do we really need this? ########## data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.AppenderBuilder; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.ObjectModel; +import org.apache.iceberg.io.ReadBuilder; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which provides the available {@link ReadBuilder}s and writer builders ({@link + * org.apache.iceberg.data.AppenderBuilder}, {@link DataWriterBuilder}, {@link + * EqualityDeleteWriterBuilder}, {@link PositionDeleteWriterBuilder}). Based on the `file format` + * and the requested `object model name` the registry returns the correct reader and writer + * builders. These builders could be used to generate the readers and writers. + * + * <p>The available {@link ObjectModel}s are registered by the {@link + * #registerObjectModel(ObjectModel)} method. These {@link ObjectModel}s will be used to create the + * {@link ReadBuilder}s and the {@link AppenderBuilder}s. The former ones are returned directly, the + * later ones are wrapped in the appropriate writer builder implementations. + */ +public final class ObjectModelRegistry { + private static final Logger LOG = LoggerFactory.getLogger(ObjectModelRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of(); + + private static final Map<Key, ObjectModel<?>> OBJECT_MODELS = Maps.newConcurrentMap(); + + /** + * Registers a new object model. + * + * @param objectModel the object model + * @throws IllegalArgumentException if an object model for the given {@code format} and {@code + * objectModelName} combination already exists + */ + @SuppressWarnings("CatchBlockLogException") + public static void registerObjectModel(ObjectModel<?> objectModel) { + try { + Key key = new Key(objectModel.format(), objectModel.name()); + if (OBJECT_MODELS.containsKey(key)) { + throw new IllegalArgumentException( + String.format( + "Object model %s clashes with %s. Both serves %s", + objectModel.getClass(), OBJECT_MODELS.get(key), key)); + } + + OBJECT_MODELS.put(key, objectModel); + } catch (RuntimeException e) { + // failing to register an object model is normal and does not require a stack trace + LOG.info( + "Unable to use register object model {} for data files: {}", objectModel, e.getMessage()); + } + } + + @SuppressWarnings("CatchBlockLogException") + private static void registerSupportedFormats() { + // Uses dynamic methods to call the `register` for the listed classes + for (String classToRegister : CLASSES_TO_REGISTER) { + try { + DynMethods.builder("register").impl(classToRegister).buildStaticChecked().invoke(); + } catch (NoSuchMethodException e) { + // failing to register an object model is normal and does not require a stack trace + LOG.info("Unable to register {} for data files: {}", classToRegister, e.getMessage()); + } + } + } + + static { + registerSupportedFormats(); + } + + private ObjectModelRegistry() {} + + /** + * Provides a reader builder for the given input file which returns objects generated by the given + * object model. + * + * @param format of the file to read + * @param objectModelName name of the object model used to generate the reader + * @param inputFile to read + * @return {@link ReadBuilder} for building the actual reader + */ + public static ReadBuilder<?> readBuilder( + FileFormat format, String objectModelName, InputFile inputFile) { + return OBJECT_MODELS.get(new Key(format, objectModelName)).readBuilder(inputFile); + } + + /** + * Provides an appender builder for the given output file which writes a data file with a given + * file format and expects input records defined by the object model. + * + * @param format of the file to write + * @param objectModelName name of the object model used to generate the writer + * @param outputFile to write + * @param <E> type for the engine specific schema expected by the appender + * @return {@link ReadBuilder} for building the actual reader + */ + public static <E> org.apache.iceberg.data.AppenderBuilder<?, E> appenderBuilder( Review Comment: Do we really need this? ########## core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; + +/** + * Interface which should be implemented by the data file format implementations. The {@link + * AppenderBuilder} will be parametrized based on the user provided configuration and finally the + * {@link AppenderBuilder#build(AppenderBuilder.WriteMode)} method is used to generate the appender + * for the specific writer use-cases. The following input should be handled by the appender in the + * specific modes: + * + * <ul> + * <li>The appender's engine specific input type + * <ul> + * <li>{@link AppenderBuilder.WriteMode#APPENDER} + * <li>{@link AppenderBuilder.WriteMode#DATA_WRITER} + * <li>{@link AppenderBuilder.WriteMode#EQUALITY_DELETE_WRITER} + * </ul> + * <li>{@link org.apache.iceberg.deletes.PositionDelete} where the type of the row is the + * appender's engine specific input type + * <ul> + * <li>{@link AppenderBuilder.WriteMode#POSITION_DELETE_WRITER} + * <li>{@link AppenderBuilder.WriteMode#POSITION_DELETE_WITH_ROW_WRITER} + * </ul> + * </ul> + * + * @param <B> type returned by builder API to allow chained calls + * @param <E> the engine specific schema of the input data + */ +public interface AppenderBuilder<B extends AppenderBuilder<B, E>, E> { + /** Set the file schema. */ + B schema(Schema newSchema); + + /** + * Set a writer configuration property which affects the writer behavior. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + B set(String property, String value); + + /** + * Set a file metadata property in the created file. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + B meta(String property, String value); + + /** Sets the metrics configuration used for collecting column metrics for the created file. */ + B metricsConfig(MetricsConfig newMetricsConfig); + + /** Overwrite the file if it already exists. By default, overwrite is disabled. */ + B overwrite(); + + /** + * Overwrite the file if it already exists. The default value is <code>false</code>. + * + * @deprecated Since 1.10.0, will be removed in 1.11.0. Only provided for backward compatibility. + * Use {@link #overwrite()} instead. + */ + @Deprecated + B overwrite(boolean enabled); + + /** + * Sets the encryption key used for writing the file. If encryption is not supported by the reader + * then an exception should be thrown. + */ + default B fileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the additional authentication data prefix used for writing the file. If encryption is not + * supported by the reader then an exception should be thrown. + */ + default B aadPrefix(ByteBuffer aadPrefix) { + throw new UnsupportedOperationException("Not supported"); + } + + /** + * Sets the engine native schema for the input. Defines the input type when there is N to 1 + * mapping between the engine type and the Iceberg type, and providing the Iceberg schema is not + * enough for the conversion. + */ + B engineSchema(E newEngineSchema); + + /** + * Builds the {@link FileAppender} for the configured {@link WriteMode}. Could change several + * use-case specific configurations, like: + * + * <ul> + * <li>Mode specific writer context (typically different for data and delete files). + * <li>Writer functions to accept data rows, or {@link + * org.apache.iceberg.deletes.PositionDelete}s + * </ul> + */ + <D> FileAppender<D> build(WriteMode mode) throws IOException; + + /** + * Writer modes. Based on the mode {@link #build(WriteMode)} could alter the appender + * configuration when creating the {@link FileAppender}. + */ + enum WriteMode { + /** Mode for appending data to a file. */ + APPENDER, Review Comment: How is this different from `DATA_WRITER`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org