pvary commented on code in PR #12298: URL: https://github.com/apache/iceberg/pull/12298#discussion_r2001469432
########## core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io.datafile; + +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which provides the available {@link ReadBuilder}s and {@link WriteBuilder}s. Based on + * the `file format`, the required `data type` and the reader/writer `builderType` the registry + * returns the correct reader and writer builders. These builders could be used to generate the + * readers and writers. + * + * <p>File formats has to register the {@link ReadBuilder}s and the {@link AppenderBuilder}s which + * will be used to create the readers and the writers. The readers returned directly, the appenders + * are wrapped into a {@link WriteBuilder} and the {@link + * AppenderBuilder#build(AppenderBuilder.WriteMode)} method is used to finalize the appender + * configuration for the specific writer use-cases. The following inputs should be handled by the + * appender in the following cases: + * + * <ul> + * <li>The appender's native input type - {@link AppenderBuilder.WriteMode#APPENDER}, {@link + * AppenderBuilder.WriteMode#DATA_WRITER}, {@link + * AppenderBuilder.WriteMode#EQUALITY_DELETE_WRITER} + * <li>{@link org.apache.iceberg.deletes.PositionDelete} where the row is the appender's native + * input type - {@link AppenderBuilder.WriteMode#POSITION_DELETE_WRITER}, {@link + * AppenderBuilder.WriteMode#POSITION_DELETE_WITH_ROW_WRITER} + * </ul> + */ +public final class DataFileServiceRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DataFileServiceRegistry.class); + // The list of classes which are used for registering the reader and writer builders + private static final String[] CLASSES_TO_REGISTER = + new String[] { + "org.apache.iceberg.parquet.Parquet", + "org.apache.iceberg.orc.ORC", + "org.apache.iceberg.arrow.vectorized.ArrowReader", + "org.apache.iceberg.flink.source.RowDataFileScanTaskReader", + "org.apache.iceberg.flink.sink.FlinkAppenderFactory", + "org.apache.iceberg.spark.source.DataFileServices" + }; + + private static final Map<Key, Function<EncryptedOutputFile, AppenderBuilder<?, ?>>> + APPENDER_BUILDERS = Maps.newConcurrentMap(); + private static final Map<Key, Function<InputFile, ReadBuilder<?, ?>>> READ_BUILDERS = + Maps.newConcurrentMap(); + + /** Registers a new appender builder for the given format/input type. */ + public static void registerAppender( + FileFormat format, + String inputType, + Function<EncryptedOutputFile, AppenderBuilder<?, ?>> appenderBuilder) { + Key key = new Key(format, inputType, null); + if (APPENDER_BUILDERS.containsKey(key)) { + throw new IllegalArgumentException( + String.format( + "Appender builder %s clashes with %s. Both serves %s", + appenderBuilder.getClass(), APPENDER_BUILDERS.get(key), key)); + } + + APPENDER_BUILDERS.putIfAbsent(key, appenderBuilder); + } + + /** Registers a new reader builder for the given format/input type. */ + public static void registerReader( + FileFormat format, String outputType, Function<InputFile, ReadBuilder<?, ?>> readBuilder) { + registerReader(format, outputType, null, readBuilder); + } + + /** Registers a new reader builder for the given format/input type/reader type. */ + public static void registerReader( + FileFormat format, + String outputType, + String readerType, + Function<InputFile, ReadBuilder<?, ?>> readBuilder) { + Key key = new Key(format, outputType, readerType); + if (READ_BUILDERS.containsKey(key)) { + throw new IllegalArgumentException( + String.format( + "Read builder %s clashes with %s. Both serves %s", + readBuilder.getClass(), READ_BUILDERS.get(key), key)); + } + + READ_BUILDERS.put(new Key(format, outputType, readerType), readBuilder); + } + + @SuppressWarnings("CatchBlockLogException") + private static void registerSupportedFormats() { + Avro.register(); + + // Uses dynamic methods to call the `register` for the listed classes + for (String s : CLASSES_TO_REGISTER) { + try { + DynMethods.StaticMethod register = + DynMethods.builder("register").impl(s).buildStaticChecked(); + + register.invoke(); + + } catch (NoSuchMethodException e) { + // failing to register readers/writers is normal and does not require a stack trace + LOG.info("Unable to register {} for data files: {}", s, e.getMessage()); + } + } + } + + static { + registerSupportedFormats(); + } + + private DataFileServiceRegistry() {} + + /** + * Provides a reader builder for the given input file which returns objects with a given + * returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param <F> type of the records which are filtered by {@link DeleteFilter} + * @return {@link ReadBuilder} for building the actual reader + */ + public static <F> ReadBuilder<?, F> readBuilder( + FileFormat format, String returnType, InputFile inputFile) { + return readBuilder(format, returnType, null, inputFile); + } + + /** + * Provides a reader builder for the given input file which returns objects with a given + * returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param builderType of the reader builder + * @param inputFile to read + * @param <F> type of the records which are filtered by {@link DeleteFilter} + * @return {@link ReadBuilder} for building the actual reader + */ + public static <F> ReadBuilder<?, F> readBuilder( + FileFormat format, String returnType, String builderType, InputFile inputFile) { + return (ReadBuilder<?, F>) + READ_BUILDERS.get(new Key(format, returnType, builderType)).apply(inputFile); + } + + /** + * Provides a writer builder for the given output file which writes objects with a given + * inputType. + * + * @param format of the file to read + * @param inputType accepted by the writer + * @param outputFile to write + * @param <E> type for the engine specific schema used by the builder + * @return {@link ReadBuilder} for building the actual reader + */ + public static <E> WriteBuilder<?, E> writeBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile) { + return new WriteBuilder<>( + (AppenderBuilder<?, E>) + APPENDER_BUILDERS.get(new Key(format, inputType, null)).apply(outputFile), Review Comment: Callers don't need to provide it, but removed because of another comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org