trolle4 commented on issue #11796:
URL: https://github.com/apache/iceberg/issues/11796#issuecomment-2545131515

   Also added some debug logging inside the class 
`org.apache.iceberg.connect.channel.CommitterImpl`
   ```java
   @Override
     public void save(Collection<SinkRecord> sinkRecords) {
   
       if (sinkRecords != null && !sinkRecords.isEmpty()) {
         worker.save(sinkRecords);
         if (sinkRecords.iterator().next().topic().contains("iceberg")) {
           LOG.info(
               "Saving {} records. Last offset {}",
               sinkRecords.size(),
               sinkRecords.stream()
                   .reduce((first, second) -> second)
                   .map(SinkRecord::kafkaOffset)
                   .orElse(-1L));
         }
       }
       processControlEvents();
     }
   ```
   
   Which produced the following logs for the above problem:
   
   ```
   2024-12-16 06:40:15,390 INFO  Saving 1 records. Last offset 151629   
[org.apache.iceberg.connect.channel.CommitterImpl]
   2024-12-16 06:40:15,661 INFO  Saving 1 records. Last offset 151630   
[org.apache.iceberg.connect.channel.CommitterImpl]
   ``` 
   
   So the message is read from the source kafka topic when it should be. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to