fqaiser94 commented on code in PR #10351: URL: https://github.com/apache/iceberg/pull/10351#discussion_r1616117443
########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkTask extends SinkTask { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class); + + private IcebergSinkConfig config; + private Catalog catalog; + private Committer committer; + + @Override + public String version() { + return IcebergSinkConfig.version(); + } + + @Override + public void start(Map<String, String> props) { + this.config = new IcebergSinkConfig(props); + } + + @Override + public void open(Collection<TopicPartition> partitions) { + catalog = CatalogUtils.loadCatalog(config); + committer = CommitterFactory.createCommitter(config); + committer.start(catalog, config, context, partitions); + } + + @Override + public void close(Collection<TopicPartition> partitions) { + close(); + } + + private void close() { + if (committer != null) { + committer.stop(); + committer = null; + } + + if (catalog != null) { + if (catalog instanceof AutoCloseable) { + try { + ((AutoCloseable) catalog).close(); + } catch (Exception e) { + LOG.warn("An error occurred closing catalog instance, ignoring...", e); + } + } + catalog = null; + } + } + + @Override + public void put(Collection<SinkRecord> sinkRecords) { + if (committer != null) { + committer.save(sinkRecords); + } + } + + @Override + public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { + committer.save(null); + } Review Comment: Why are we overriding the `flush` method when we don't have any flush-specific code path in Committer? The `put` method will be called on a regular basis (potentially with an empty collection of sink records) so this feels redundant. Also, I'm fairly certain that this `flush` method will never actually be called since we are overriding the `preCommit` method. The default `preCommit` implementation is the only place where `flush` is called by the Kafka Connect runtime. Unless you call `flush` yourself in the `preCommit` method you've defined (or anywhere else), `flush` will never actually be called. Overall, I would recommend you just omit this `flush` method definition from this class. ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkTask extends SinkTask { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class); + + private IcebergSinkConfig config; + private Catalog catalog; + private Committer committer; + + @Override + public String version() { + return IcebergSinkConfig.version(); + } + + @Override + public void start(Map<String, String> props) { + this.config = new IcebergSinkConfig(props); + } + + @Override + public void open(Collection<TopicPartition> partitions) { + catalog = CatalogUtils.loadCatalog(config); + committer = CommitterFactory.createCommitter(config); + committer.start(catalog, config, context, partitions); + } + + @Override + public void close(Collection<TopicPartition> partitions) { + close(); + } + + private void close() { + if (committer != null) { + committer.stop(); + committer = null; + } + + if (catalog != null) { + if (catalog instanceof AutoCloseable) { + try { + ((AutoCloseable) catalog).close(); + } catch (Exception e) { + LOG.warn("An error occurred closing catalog instance, ignoring...", e); + } + } + catalog = null; + } + } Review Comment: nit: can you move these `close` methods so they're after `preCommit` but before `stop`? Just so these methods are arranged in life-cycle order. ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkTask extends SinkTask { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class); + + private IcebergSinkConfig config; + private Catalog catalog; + private Committer committer; + + @Override + public String version() { + return IcebergSinkConfig.version(); + } + + @Override + public void start(Map<String, String> props) { + this.config = new IcebergSinkConfig(props); + } + + @Override + public void open(Collection<TopicPartition> partitions) { + catalog = CatalogUtils.loadCatalog(config); Review Comment: I'm not 100% sure about this but IIRC from past experience, Kafka Connect doesn't really guarantee that `close` will be called before `open` if a SinkTask instance is reused, this can lead to resource leaks (or worse). So I usually defensively call `close` myself in the `open` method first before opening new resources (catalog and committer in this case). Note: I can't find any documentation/issues to explicitly support this right now but looking at how things are implemented in the Kafka Connect runtime using the ConsumerRebalanceListener API I think I can see at least one way where this would be possible (open called without/before close). ########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect; + +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergSinkTask extends SinkTask { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkTask.class); + + private IcebergSinkConfig config; + private Catalog catalog; + private Committer committer; + + @Override + public String version() { + return IcebergSinkConfig.version(); + } + + @Override + public void start(Map<String, String> props) { + this.config = new IcebergSinkConfig(props); + } + + @Override + public void open(Collection<TopicPartition> partitions) { + catalog = CatalogUtils.loadCatalog(config); Review Comment: `open` method is called with only the newly assigned partitions. Is there a strong reason to pass just the _newly assigned_ partitions to the `Committer.start` method when the Committer can just retrieve _all_ partitions assigned to this task via `context.assignment` anyway? I'm also worried we might have a bug here. The Committer implementation uses this `partitions` argument to check if partition 0 of the first topic is assigned to this task and if so, it spawns a Coordinator process. I'm worried that if there was a rebalance where the partition 0 of the first topic doesn't move between tasks, then it would not be included in the `partitions` argument for any Task and thus we could potentially end up with a Connector that doesn't have any Coordinator process running on any Task. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org