fqaiser94 commented on code in PR #10351: URL: https://github.com/apache/iceberg/pull/10351#discussion_r1621333676
########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkTask extends SinkTask { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class); + + private IcebergSinkConfig config; + private Catalog catalog; + private Committer committer; + + @Override + public String version() { + return IcebergSinkConfig.version(); Review Comment: Looks like this method was implemented before this PR but I have a question about this. This method returns a String that looks something like this: ``` IcebergBuild.version() + "-kc-" + kcVersion; ``` where `kcVersion = IcebergSinkConfig.class.getPackage().getImplementationVersion()`. Won't `kcVersion` and `IcebergBuild.version()` be the same value since AFAIK this connector's releases will be tied with the general iceberg releases? CMIIW Note: when I run the existing unit test locally, I get `1.6.0-SNAPHSHOT-kc-unknown` currently. ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkTask extends SinkTask { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class); + + private IcebergSinkConfig config; + private Catalog catalog; Review Comment: As far as I know Catalog objects are not guaranteed to be thread-safe. And at least on the "leader" tasks, we can have multiple threads using the same Catalog instance at the same time ("leader" tasks have both a main thread as well as a CoordinatorThread). I haven't heard users report any issues about this but it would be better to avoid this risk entirely? ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkWriter { + private final IcebergSinkConfig config; + private final IcebergWriterFactory writerFactory; + private final Map<String, RecordWriter> writers; + private final Map<TopicPartition, Offset> sourceOffsets; + + public SinkWriter(Catalog catalog, IcebergSinkConfig config) { + this.config = config; + this.writerFactory = new IcebergWriterFactory(catalog, config); + this.writers = Maps.newHashMap(); + this.sourceOffsets = Maps.newHashMap(); + } + + public void close() { + writers.values().forEach(RecordWriter::close); + } + + public SinkWriterResult completeWrite() { + List<IcebergWriterResult> writerResults = + writers.values().stream() + .flatMap(writer -> writer.complete().stream()) + .collect(Collectors.toList()); + Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets); + + writers.clear(); + sourceOffsets.clear(); + + return new SinkWriterResult(writerResults, offsets); + } + + public void save(Collection<SinkRecord> sinkRecords) { + sinkRecords.forEach(this::save); + } + + private void save(SinkRecord record) { + // the consumer stores the offsets that corresponds to the next record to consume, + // so increment the record offset by one + OffsetDateTime timestamp = + record.timestamp() == null + ? null + : OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC); + sourceOffsets.put( + new TopicPartition(record.topic(), record.kafkaPartition()), + new Offset(record.kafkaOffset() + 1, timestamp)); + + if (config.dynamicTablesEnabled()) { + routeRecordDynamically(record); + } else { + routeRecordStatically(record); + } + } + + private void routeRecordStatically(SinkRecord record) { + String routeField = config.tablesRouteField(); + + if (routeField == null) { + // route to all tables + config + .tables() + .forEach( + tableName -> { + writerForTable(tableName, record, false).write(record); + }); + + } else { + String routeValue = extractRouteValue(record.value(), routeField); + if (routeValue != null) { + config + .tables() + .forEach( + tableName -> { + Pattern regex = config.tableConfig(tableName).routeRegex(); + if (regex != null && regex.matcher(routeValue).matches()) { + writerForTable(tableName, record, false).write(record); + } + }); + } Review Comment: nit: is this really "static" routing? I guess it's a static list of tables, but the table/s each message is written to is determined dynamically based on the route value ... More importantly, is this an important enough use-case to support within the connector? I would strongly prefer if we didn't support this within the connector itself (users can easily implement this by writing an SMT + dynamic mode). ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkWriter { + private final IcebergSinkConfig config; + private final IcebergWriterFactory writerFactory; + private final Map<String, RecordWriter> writers; + private final Map<TopicPartition, Offset> sourceOffsets; + + public SinkWriter(Catalog catalog, IcebergSinkConfig config) { + this.config = config; + this.writerFactory = new IcebergWriterFactory(catalog, config); + this.writers = Maps.newHashMap(); + this.sourceOffsets = Maps.newHashMap(); + } + + public void close() { + writers.values().forEach(RecordWriter::close); + } + + public SinkWriterResult completeWrite() { + List<IcebergWriterResult> writerResults = + writers.values().stream() + .flatMap(writer -> writer.complete().stream()) + .collect(Collectors.toList()); + Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets); + + writers.clear(); + sourceOffsets.clear(); + + return new SinkWriterResult(writerResults, offsets); + } + + public void save(Collection<SinkRecord> sinkRecords) { + sinkRecords.forEach(this::save); + } + + private void save(SinkRecord record) { + // the consumer stores the offsets that corresponds to the next record to consume, + // so increment the record offset by one + OffsetDateTime timestamp = + record.timestamp() == null + ? null + : OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC); + sourceOffsets.put( + new TopicPartition(record.topic(), record.kafkaPartition()), + new Offset(record.kafkaOffset() + 1, timestamp)); + + if (config.dynamicTablesEnabled()) { Review Comment: Same here, we really don't need to check this on every record? Feels like we're missing an abstraction here, something like the concept of a `Router` which has a `StaticRouter` implementation and a `DynamicRouter` implementation and only one of those is constructed for the lifetime of a `SinkWriter` based on the `config.dynamicTablesEnabled()` setting. ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkWriter { + private final IcebergSinkConfig config; + private final IcebergWriterFactory writerFactory; + private final Map<String, RecordWriter> writers; + private final Map<TopicPartition, Offset> sourceOffsets; + + public SinkWriter(Catalog catalog, IcebergSinkConfig config) { + this.config = config; + this.writerFactory = new IcebergWriterFactory(catalog, config); + this.writers = Maps.newHashMap(); + this.sourceOffsets = Maps.newHashMap(); + } + + public void close() { + writers.values().forEach(RecordWriter::close); + } + + public SinkWriterResult completeWrite() { + List<IcebergWriterResult> writerResults = + writers.values().stream() + .flatMap(writer -> writer.complete().stream()) + .collect(Collectors.toList()); + Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets); + + writers.clear(); + sourceOffsets.clear(); + + return new SinkWriterResult(writerResults, offsets); + } + + public void save(Collection<SinkRecord> sinkRecords) { + sinkRecords.forEach(this::save); + } + + private void save(SinkRecord record) { + // the consumer stores the offsets that corresponds to the next record to consume, + // so increment the record offset by one + OffsetDateTime timestamp = + record.timestamp() == null + ? null + : OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC); + sourceOffsets.put( + new TopicPartition(record.topic(), record.kafkaPartition()), + new Offset(record.kafkaOffset() + 1, timestamp)); + + if (config.dynamicTablesEnabled()) { + routeRecordDynamically(record); + } else { + routeRecordStatically(record); + } + } + + private void routeRecordStatically(SinkRecord record) { + String routeField = config.tablesRouteField(); + + if (routeField == null) { + // route to all tables + config + .tables() + .forEach( + tableName -> { + writerForTable(tableName, record, false).write(record); + }); + + } else { + String routeValue = extractRouteValue(record.value(), routeField); + if (routeValue != null) { + config + .tables() + .forEach( + tableName -> { + Pattern regex = config.tableConfig(tableName).routeRegex(); + if (regex != null && regex.matcher(routeValue).matches()) { + writerForTable(tableName, record, false).write(record); + } + }); + } + } + } + + private void routeRecordDynamically(SinkRecord record) { + String routeField = config.tablesRouteField(); + Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing"); Review Comment: This should really be checked at config parsing time or at SinkWriter construction time? Instead of on every record. To be clear; I'm not worried about this from a performance perspective (I'm confident the JVM will optimize this away) but it just seems awkward. ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkWriter { + private final IcebergSinkConfig config; + private final IcebergWriterFactory writerFactory; + private final Map<String, RecordWriter> writers; + private final Map<TopicPartition, Offset> sourceOffsets; + + public SinkWriter(Catalog catalog, IcebergSinkConfig config) { + this.config = config; + this.writerFactory = new IcebergWriterFactory(catalog, config); + this.writers = Maps.newHashMap(); + this.sourceOffsets = Maps.newHashMap(); + } + + public void close() { + writers.values().forEach(RecordWriter::close); + } + + public SinkWriterResult completeWrite() { + List<IcebergWriterResult> writerResults = + writers.values().stream() + .flatMap(writer -> writer.complete().stream()) + .collect(Collectors.toList()); + Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets); + + writers.clear(); + sourceOffsets.clear(); + + return new SinkWriterResult(writerResults, offsets); + } + + public void save(Collection<SinkRecord> sinkRecords) { + sinkRecords.forEach(this::save); + } + + private void save(SinkRecord record) { + // the consumer stores the offsets that corresponds to the next record to consume, + // so increment the record offset by one + OffsetDateTime timestamp = + record.timestamp() == null + ? null + : OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC); + sourceOffsets.put( + new TopicPartition(record.topic(), record.kafkaPartition()), + new Offset(record.kafkaOffset() + 1, timestamp)); + + if (config.dynamicTablesEnabled()) { + routeRecordDynamically(record); + } else { + routeRecordStatically(record); + } + } + + private void routeRecordStatically(SinkRecord record) { + String routeField = config.tablesRouteField(); + + if (routeField == null) { + // route to all tables + config + .tables() + .forEach( + tableName -> { + writerForTable(tableName, record, false).write(record); + }); + + } else { + String routeValue = extractRouteValue(record.value(), routeField); + if (routeValue != null) { + config + .tables() + .forEach( + tableName -> { + Pattern regex = config.tableConfig(tableName).routeRegex(); + if (regex != null && regex.matcher(routeValue).matches()) { + writerForTable(tableName, record, false).write(record); + } + }); + } + } + } + + private void routeRecordDynamically(SinkRecord record) { + String routeField = config.tablesRouteField(); + Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing"); + + String routeValue = extractRouteValue(record.value(), routeField); + if (routeValue != null) { Review Comment: IMO we should throw an error instead of dropping the record. Users can filter out messages easily using an SMT to get the same behaviour, if necessary. In the future, I imagine we can allow users to supply custom exception-handler implementations which could also allow users to drop records on error. ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkWriter { + private final IcebergSinkConfig config; + private final IcebergWriterFactory writerFactory; + private final Map<String, RecordWriter> writers; + private final Map<TopicPartition, Offset> sourceOffsets; + + public SinkWriter(Catalog catalog, IcebergSinkConfig config) { + this.config = config; + this.writerFactory = new IcebergWriterFactory(catalog, config); + this.writers = Maps.newHashMap(); + this.sourceOffsets = Maps.newHashMap(); + } + + public void close() { + writers.values().forEach(RecordWriter::close); + } + + public SinkWriterResult completeWrite() { + List<IcebergWriterResult> writerResults = + writers.values().stream() + .flatMap(writer -> writer.complete().stream()) + .collect(Collectors.toList()); + Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets); + + writers.clear(); + sourceOffsets.clear(); + + return new SinkWriterResult(writerResults, offsets); + } + + public void save(Collection<SinkRecord> sinkRecords) { + sinkRecords.forEach(this::save); + } + + private void save(SinkRecord record) { + // the consumer stores the offsets that corresponds to the next record to consume, + // so increment the record offset by one + OffsetDateTime timestamp = + record.timestamp() == null + ? null + : OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC); + sourceOffsets.put( + new TopicPartition(record.topic(), record.kafkaPartition()), + new Offset(record.kafkaOffset() + 1, timestamp)); + + if (config.dynamicTablesEnabled()) { + routeRecordDynamically(record); + } else { + routeRecordStatically(record); + } + } + + private void routeRecordStatically(SinkRecord record) { + String routeField = config.tablesRouteField(); + + if (routeField == null) { + // route to all tables + config + .tables() + .forEach( + tableName -> { + writerForTable(tableName, record, false).write(record); + }); + + } else { + String routeValue = extractRouteValue(record.value(), routeField); + if (routeValue != null) { + config + .tables() + .forEach( + tableName -> { + Pattern regex = config.tableConfig(tableName).routeRegex(); + if (regex != null && regex.matcher(routeValue).matches()) { + writerForTable(tableName, record, false).write(record); + } + }); + } + } + } + + private void routeRecordDynamically(SinkRecord record) { + String routeField = config.tablesRouteField(); + Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing"); + + String routeValue = extractRouteValue(record.value(), routeField); + if (routeValue != null) { + String tableName = routeValue.toLowerCase(); + writerForTable(tableName, record, true).write(record); + } Review Comment: Can we support writing to multiple tables if this is a comma separate list of table names? Like we do in "static" mode. I think this is important because then we can remove regex-matcher configs as it could be implemented entirely as an SMT + dynamic mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org