nastra commented on code in PR #9641: URL: https://github.com/apache/iceberg/pull/9641#discussion_r1479758952
########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.temporal.Temporal; +import java.util.Base64; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.kafka.connect.data.Struct; + +public class RecordConverter { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final DateTimeFormatter OFFSET_TS_FMT = + new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .appendOffset("+HHmm", "Z") + .toFormatter(); + + private final Schema tableSchema; + private final NameMapping nameMapping; + private final IcebergSinkConfig config; + private final Map<Integer, Map<String, NestedField>> structNameMap = Maps.newHashMap(); + + public RecordConverter(Table table, IcebergSinkConfig config) { + this.tableSchema = table.schema(); + this.nameMapping = createNameMapping(table); + this.config = config; + } + + public Record convert(Object data) { + return convert(data, null); + } + + public Record convert(Object data, SchemaUpdate.Consumer schemaUpdateConsumer) { + if (data instanceof Struct || data instanceof Map) { + return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer); + } + throw new UnsupportedOperationException("Cannot convert type: " + data.getClass().getName()); + } + + private NameMapping createNameMapping(Table table) { + String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + return nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + } + + private Object convertValue( + Object value, Type type, int fieldId, SchemaUpdate.Consumer schemaUpdateConsumer) { + if (value == null) { + return null; + } + switch (type.typeId()) { + case STRUCT: + return convertStructValue(value, type.asStructType(), fieldId, schemaUpdateConsumer); + case LIST: + return convertListValue(value, type.asListType(), schemaUpdateConsumer); + case MAP: + return convertMapValue(value, type.asMapType(), schemaUpdateConsumer); + case INTEGER: + return convertInt(value); + case LONG: + return convertLong(value); + case FLOAT: + return convertFloat(value); + case DOUBLE: + return convertDouble(value); + case DECIMAL: + return convertDecimal(value, (DecimalType) type); + case BOOLEAN: + return convertBoolean(value); Review Comment: boolean don't seem to be tested in the existing test cases ########## kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java: ########## @@ -0,0 +1,828 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.Temporal; +import java.util.Base64; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.SchemaUpdate.AddColumn; +import org.apache.iceberg.connect.data.SchemaUpdate.UpdateType; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.BinaryType; +import org.apache.iceberg.types.Types.DateType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.FloatType; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimeType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class RecordConverterTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final org.apache.iceberg.Schema SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(21, "i", IntegerType.get()), + Types.NestedField.required(22, "l", LongType.get()), + Types.NestedField.required(23, "d", DateType.get()), + Types.NestedField.required(24, "t", TimeType.get()), + Types.NestedField.required(25, "ts", TimestampType.withoutZone()), + Types.NestedField.required(26, "tsz", TimestampType.withZone()), + Types.NestedField.required(27, "fl", FloatType.get()), + Types.NestedField.required(28, "do", DoubleType.get()), + Types.NestedField.required(29, "dec", DecimalType.of(9, 2)), + Types.NestedField.required(30, "s", StringType.get()), + Types.NestedField.required(31, "u", Types.UUIDType.get()), + Types.NestedField.required(32, "f", Types.FixedType.ofLength(3)), + Types.NestedField.required(33, "b", BinaryType.get()), + Types.NestedField.required(34, "li", ListType.ofRequired(35, StringType.get())), + Types.NestedField.required( + 36, "ma", MapType.ofRequired(37, 38, StringType.get(), StringType.get())), + Types.NestedField.optional(39, "extra", StringType.get())); + + // we have 1 unmapped column so exclude that from the count + private static final int MAPPED_CNT = SCHEMA.columns().size() - 1; + + private static final org.apache.iceberg.Schema NESTED_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "ii", IntegerType.get()), + Types.NestedField.required(2, "st", SCHEMA.asStruct())); + + private static final org.apache.iceberg.Schema SIMPLE_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "ii", IntegerType.get()), + Types.NestedField.required(2, "st", StringType.get())); + + private static final org.apache.iceberg.Schema ID_SCHEMA = + new org.apache.iceberg.Schema(Types.NestedField.required(1, "ii", IntegerType.get())); + + private static final org.apache.iceberg.Schema STRUCT_IN_LIST_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, "stli", ListType.ofRequired(101, NESTED_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_LIST_BASIC_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(100, "stli", ListType.ofRequired(101, ID_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_MAP_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, + "stma", + MapType.ofRequired(101, 102, StringType.get(), NESTED_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_MAP_BASIC_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, "stma", MapType.ofRequired(101, 102, StringType.get(), ID_SCHEMA.asStruct()))); + + private static final Schema CONNECT_SCHEMA = + SchemaBuilder.struct() + .field("i", Schema.INT32_SCHEMA) + .field("l", Schema.INT64_SCHEMA) + .field("d", org.apache.kafka.connect.data.Date.SCHEMA) + .field("t", Time.SCHEMA) + .field("ts", Timestamp.SCHEMA) + .field("tsz", Timestamp.SCHEMA) + .field("fl", Schema.FLOAT32_SCHEMA) + .field("do", Schema.FLOAT64_SCHEMA) + .field("dec", Decimal.schema(2)) + .field("s", Schema.STRING_SCHEMA) + .field("u", Schema.STRING_SCHEMA) + .field("f", Schema.BYTES_SCHEMA) + .field("b", Schema.BYTES_SCHEMA) + .field("li", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("ma", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)); + + private static final Schema CONNECT_NESTED_SCHEMA = + SchemaBuilder.struct().field("ii", Schema.INT32_SCHEMA).field("st", CONNECT_SCHEMA); + + private static final Schema CONNECT_STRUCT_IN_LIST_SCHEMA = + SchemaBuilder.struct().field("stli", SchemaBuilder.array(CONNECT_NESTED_SCHEMA)).build(); + + private static final Schema CONNECT_STRUCT_IN_MAP_SCHEMA = + SchemaBuilder.struct() + .field("stma", SchemaBuilder.map(Schema.STRING_SCHEMA, CONNECT_NESTED_SCHEMA)) + .build(); + + private static final LocalDate DATE_VAL = LocalDate.parse("2023-05-18"); + private static final LocalTime TIME_VAL = LocalTime.parse("07:14:21"); + private static final LocalDateTime TS_VAL = LocalDateTime.parse("2023-05-18T07:14:21"); + private static final OffsetDateTime TSZ_VAL = OffsetDateTime.parse("2023-05-18T07:14:21Z"); + private static final BigDecimal DEC_VAL = new BigDecimal("12.34"); + private static final String STR_VAL = "foobar"; + private static final UUID UUID_VAL = UUID.randomUUID(); + private static final ByteBuffer BYTES_VAL = ByteBuffer.wrap(new byte[] {1, 2, 3}); + private static final List<String> LIST_VAL = ImmutableList.of("hello", "world"); + private static final Map<String, String> MAP_VAL = ImmutableMap.of("one", "1", "two", "2"); + + private static final JsonConverter JSON_CONVERTER = new JsonConverter(); + + static { Review Comment: nit: could also be done in a `@BeforeAll` as I don't think `static` blocks are used that frequently in tests ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java: ########## @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.temporal.Temporal; +import java.util.Base64; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.kafka.connect.data.Struct; + +public class RecordConverter { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final DateTimeFormatter OFFSET_TS_FMT = + new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .appendOffset("+HHmm", "Z") + .toFormatter(); + + private final Schema tableSchema; + private final NameMapping nameMapping; + private final IcebergSinkConfig config; + private final Map<Integer, Map<String, NestedField>> structNameMap = Maps.newHashMap(); + + public RecordConverter(Table table, IcebergSinkConfig config) { + this.tableSchema = table.schema(); + this.nameMapping = createNameMapping(table); + this.config = config; + } + + public Record convert(Object data) { + return convert(data, null); + } + + public Record convert(Object data, SchemaUpdate.Consumer schemaUpdateConsumer) { + if (data instanceof Struct || data instanceof Map) { + return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer); + } + throw new UnsupportedOperationException("Cannot convert type: " + data.getClass().getName()); + } + + private NameMapping createNameMapping(Table table) { + String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + return nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + } + + private Object convertValue( + Object value, Type type, int fieldId, SchemaUpdate.Consumer schemaUpdateConsumer) { + if (value == null) { + return null; + } + switch (type.typeId()) { + case STRUCT: + return convertStructValue(value, type.asStructType(), fieldId, schemaUpdateConsumer); + case LIST: + return convertListValue(value, type.asListType(), schemaUpdateConsumer); + case MAP: + return convertMapValue(value, type.asMapType(), schemaUpdateConsumer); + case INTEGER: + return convertInt(value); + case LONG: + return convertLong(value); + case FLOAT: + return convertFloat(value); + case DOUBLE: + return convertDouble(value); + case DECIMAL: + return convertDecimal(value, (DecimalType) type); + case BOOLEAN: + return convertBoolean(value); + case STRING: + return convertString(value); + case UUID: + return convertUUID(value); + case BINARY: + case FIXED: + return convertBase64Binary(value); + case DATE: + return convertDateValue(value); + case TIME: + return convertTimeValue(value); + case TIMESTAMP: + return convertTimestampValue(value, (TimestampType) type); + } + throw new UnsupportedOperationException("Unsupported type: " + type.typeId()); + } + + protected GenericRecord convertStructValue( + Object value, + StructType schema, + int parentFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + if (value instanceof Map) { + return convertToStruct((Map<?, ?>) value, schema, parentFieldId, schemaUpdateConsumer); + } else if (value instanceof Struct) { + return convertToStruct((Struct) value, schema, parentFieldId, schemaUpdateConsumer); + } + throw new IllegalArgumentException("Cannot convert to struct: " + value.getClass().getName()); + } + + private GenericRecord convertToStruct( + Map<?, ?> map, + StructType schema, + int structFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + GenericRecord result = GenericRecord.create(schema); + map.forEach( + (recordFieldNameObj, recordFieldValue) -> { + String recordFieldName = recordFieldNameObj.toString(); + NestedField tableField = lookupStructField(recordFieldName, schema, structFieldId); + if (tableField == null) { + // add the column if schema evolution is on, otherwise skip the value, + // skip the add column if we can't infer the type + if (schemaUpdateConsumer != null) { + Type type = SchemaUtils.inferIcebergType(recordFieldValue, config); + if (type != null) { + String parentFieldName = + structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); + schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type); + } + } + } else { + result.setField( + tableField.name(), + convertValue( + recordFieldValue, + tableField.type(), + tableField.fieldId(), + schemaUpdateConsumer)); + } + }); + return result; + } + + private GenericRecord convertToStruct( + Struct struct, + StructType schema, + int structFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + GenericRecord result = GenericRecord.create(schema); + struct + .schema() + .fields() + .forEach( + recordField -> { + NestedField tableField = lookupStructField(recordField.name(), schema, structFieldId); + if (tableField == null) { + // add the column if schema evolution is on, otherwise skip the value + if (schemaUpdateConsumer != null) { + String parentFieldName = + structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); + Type type = SchemaUtils.toIcebergType(recordField.schema(), config); + schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type); + } + } else { + boolean hasSchemaUpdates = false; + if (schemaUpdateConsumer != null) { + // update the type if needed and schema evolution is on + PrimitiveType evolveDataType = + SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema()); + if (evolveDataType != null) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + schemaUpdateConsumer.updateType(fieldName, evolveDataType); + hasSchemaUpdates = true; + } + // make optional if needed and schema evolution is on + if (tableField.isRequired() && recordField.schema().isOptional()) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + schemaUpdateConsumer.makeOptional(fieldName); + hasSchemaUpdates = true; + } + } + if (!hasSchemaUpdates) { + result.setField( + tableField.name(), + convertValue( + struct.get(recordField), + tableField.type(), + tableField.fieldId(), + schemaUpdateConsumer)); + } + } + }); + return result; + } + + private NestedField lookupStructField(String fieldName, StructType schema, int structFieldId) { + if (nameMapping == null) { + return config.schemaCaseInsensitive() + ? schema.caseInsensitiveField(fieldName) + : schema.field(fieldName); + } + + return structNameMap + .computeIfAbsent(structFieldId, notUsed -> createStructNameMap(schema)) + .get(fieldName); + } + + private Map<String, NestedField> createStructNameMap(StructType schema) { + Map<String, NestedField> map = Maps.newHashMap(); + schema + .fields() + .forEach( + col -> { + MappedField mappedField = nameMapping.find(col.fieldId()); + if (mappedField != null && !mappedField.names().isEmpty()) { + mappedField.names().forEach(name -> map.put(name, col)); + } else { + map.put(col.name(), col); + } + }); + return map; + } + + protected List<Object> convertListValue( + Object value, ListType type, SchemaUpdate.Consumer schemaUpdateConsumer) { + Preconditions.checkArgument(value instanceof List); + List<?> list = (List<?>) value; + return list.stream() + .map( + element -> { + int fieldId = type.fields().get(0).fieldId(); + return convertValue(element, type.elementType(), fieldId, schemaUpdateConsumer); + }) + .collect(Collectors.toList()); + } + + protected Map<Object, Object> convertMapValue( + Object value, MapType type, SchemaUpdate.Consumer schemaUpdateConsumer) { + Preconditions.checkArgument(value instanceof Map); + Map<?, ?> map = (Map<?, ?>) value; + Map<Object, Object> result = Maps.newHashMap(); + map.forEach( + (k, v) -> { + int keyFieldId = type.fields().get(0).fieldId(); + int valueFieldId = type.fields().get(1).fieldId(); + result.put( + convertValue(k, type.keyType(), keyFieldId, schemaUpdateConsumer), + convertValue(v, type.valueType(), valueFieldId, schemaUpdateConsumer)); + }); + return result; + } + + protected int convertInt(Object value) { + if (value instanceof Number) { + return ((Number) value).intValue(); + } else if (value instanceof String) { Review Comment: I've noticed that the String part for ints/longs/floats/doubles isn't tested in the existing tests. maybe it would be good to test those if that's easily possible ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; + +/** + * This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types. + */ +public class RecordProjection implements Record { + + /** + * Creates a projecting wrapper for {@link Record} rows. + * + * <p>This projection does not work with repeated types like lists and maps. + * + * @param dataSchema schema of rows wrapped by this projection + * @param projectedSchema result schema of the projected rows + * @return a wrapper to project rows + */ + public static RecordProjection create(Schema dataSchema, Schema projectedSchema) { + return new RecordProjection(dataSchema.asStruct(), projectedSchema.asStruct()); + } + + private final StructType type; + private final int[] positionMap; + private final RecordProjection[] nestedProjections; + private Record record; + + private RecordProjection(StructType structType, StructType projection) { + this(structType, projection, false); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private RecordProjection(StructType structType, StructType projection, boolean allowMissing) { + this.type = projection; + this.positionMap = new int[projection.fields().size()]; + this.nestedProjections = new RecordProjection[projection.fields().size()]; + + // set up the projection positions and any nested projections that are needed + List<NestedField> dataFields = structType.fields(); + for (int pos = 0; pos < positionMap.length; pos += 1) { + NestedField projectedField = projection.fields().get(pos); + + boolean found = false; + for (int i = 0; !found && i < dataFields.size(); i += 1) { + NestedField dataField = dataFields.get(i); + if (projectedField.fieldId() == dataField.fieldId()) { + found = true; + positionMap[pos] = i; + switch (projectedField.type().typeId()) { + case STRUCT: + nestedProjections[pos] = + new RecordProjection( + dataField.type().asStructType(), projectedField.type().asStructType()); + break; + case MAP: + MapType projectedMap = projectedField.type().asMapType(); + MapType originalMap = dataField.type().asMapType(); + + boolean keyProjectable = + !projectedMap.keyType().isNestedType() + || projectedMap.keyType().equals(originalMap.keyType()); + boolean valueProjectable = + !projectedMap.valueType().isNestedType() + || projectedMap.valueType().equals(originalMap.valueType()); + Preconditions.checkArgument( + keyProjectable && valueProjectable, + "Cannot project a partial map key or value struct. Trying to project %s out of %s", + projectedField, + dataField); + + nestedProjections[pos] = null; + break; + case LIST: + ListType projectedList = projectedField.type().asListType(); + ListType originalList = dataField.type().asListType(); + + boolean elementProjectable = + !projectedList.elementType().isNestedType() + || projectedList.elementType().equals(originalList.elementType()); + Preconditions.checkArgument( + elementProjectable, + "Cannot project a partial list element struct. Trying to project %s out of %s", + projectedField, + dataField); + + nestedProjections[pos] = null; + break; + default: + nestedProjections[pos] = null; + } + } + } + + if (!found && projectedField.isOptional() && allowMissing) { + positionMap[pos] = -1; + nestedProjections[pos] = null; + } else if (!found) { + throw new IllegalArgumentException( + String.format("Cannot find field %s in %s", projectedField, structType)); + } + } + } + + public RecordProjection wrap(Record newRecord) { + this.record = newRecord; + return this; + } + + @Override + public int size() { + return type.fields().size(); + } + + @Override + public <T> T get(int pos, Class<T> javaClass) { + // struct can be null if wrap is not called first before the get call + // or if a null struct is wrapped. + if (record == null) { + return null; + } + + int recordPos = positionMap[pos]; + if (nestedProjections[pos] != null) { + Record nestedStruct = record.get(recordPos, Record.class); + if (nestedStruct == null) { + return null; + } + + return javaClass.cast(nestedProjections[pos].wrap(nestedStruct)); + } + + if (recordPos != -1) { + return record.get(recordPos, javaClass); + } else { + return null; + } + } + + @Override + public <T> void set(int pos, T value) { + throw new UnsupportedOperationException(); Review Comment: I think it would be good if all of the UOE would have a short message saying what exactly isn't supported. This makes it easier to to understand where the UOE is coming from when it ever is getting thrown and stack traces aren't shown in the logs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org