mblesak commented on issue #13593: URL: https://github.com/apache/iceberg/issues/13593#issuecomment-3095269275
This part seems to be problem: https://github.com/apache/iceberg/blob/77f1f5ba47f0e5250b8af4e16cbe6febc8244ee5/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java#L138 Current implementation: ``` @Override public void close(Collection<TopicPartition> closedPartitions) { if (hasLeaderPartition(closedPartitions)) { LOG.info("Committer lost leader partition. Stopping Coordinator."); stopCoordinator(); } stopWorker(); KafkaUtils.seekToLastCommittedOffsets(context); } ``` The Close operation of the IcebergSinkTask closes the instantiated assets including Catalog. The Commiter (and Coordinator) wants to be also closed but due to condition (in race conditions circumstances) the background thread can survive and continue operates with already closed resources. This can happen especially during frequent rebalancing like the MSK Connect in Autoscaled configuration does. Testing connector with implementation: ``` @Override public void close(Collection<TopicPartition> closedPartitions) { stopCoordinator(); stopWorker(); KafkaUtils.seekToLastCommittedOffsets(context); } ``` Commiter (and Coordinator) must be closed/terminated with IcebergSinkTask. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org