mblesak commented on issue #13593:
URL: https://github.com/apache/iceberg/issues/13593#issuecomment-3095269275

   This part seems to be problem:
   
https://github.com/apache/iceberg/blob/77f1f5ba47f0e5250b8af4e16cbe6febc8244ee5/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java#L138
   
   Current implementation:
   ```
     @Override
     public void close(Collection<TopicPartition> closedPartitions) {
       if (hasLeaderPartition(closedPartitions)) {
         LOG.info("Committer lost leader partition. Stopping Coordinator.");
         stopCoordinator();
       }
       stopWorker();
       KafkaUtils.seekToLastCommittedOffsets(context);
     }
   ```
   
   The Close operation of the IcebergSinkTask closes the instantiated assets 
including Catalog.
   The Commiter (and Coordinator) wants to be also closed but due to condition 
(in race conditions circumstances) the background thread can survive and 
continue operates with already closed resources. This can happen especially 
during frequent rebalancing like the MSK Connect in Autoscaled configuration 
does.
   
   Testing connector with implementation:
   ```
     @Override
     public void close(Collection<TopicPartition> closedPartitions) {
       stopCoordinator();
       stopWorker();
       KafkaUtils.seekToLastCommittedOffsets(context);
     }
   ```
   
   Commiter (and Coordinator) must be closed/terminated with IcebergSinkTask.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to