trolle4 commented on issue #11796: URL: https://github.com/apache/iceberg/issues/11796#issuecomment-2545131515
Also added some debug logging inside the class `org.apache.iceberg.connect.channel.CommitterImpl` ```java @Override public void save(Collection<SinkRecord> sinkRecords) { if (sinkRecords != null && !sinkRecords.isEmpty()) { worker.save(sinkRecords); if (sinkRecords.iterator().next().topic().contains("iceberg")) { LOG.info( "Saving {} records. Last offset {}", sinkRecords.size(), sinkRecords.stream() .reduce((first, second) -> second) .map(SinkRecord::kafkaOffset) .orElse(-1L)); } } processControlEvents(); } ``` Which produced the following logs for the above problem: ``` 2024-12-16 06:40:15,390 INFO Saving 1 records. Last offset 151629 [org.apache.iceberg.connect.channel.CommitterImpl] 2024-12-16 06:40:15,661 INFO Saving 1 records. Last offset 151630 [org.apache.iceberg.connect.channel.CommitterImpl] ``` So the message is read from the source kafka topic when it should be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org