trolle4 commented on issue #11796:
URL: https://github.com/apache/iceberg/issues/11796#issuecomment-2545131515
Also added some debug logging inside the class
`org.apache.iceberg.connect.channel.CommitterImpl`
```java
@Override
public void save(Collection<SinkRecord> sinkRecords) {
if (sinkRecords != null && !sinkRecords.isEmpty()) {
worker.save(sinkRecords);
if (sinkRecords.iterator().next().topic().contains("iceberg")) {
LOG.info(
"Saving {} records. Last offset {}",
sinkRecords.size(),
sinkRecords.stream()
.reduce((first, second) -> second)
.map(SinkRecord::kafkaOffset)
.orElse(-1L));
}
}
processControlEvents();
}
```
Which produced the following logs for the above problem:
```
2024-12-16 06:40:15,390 INFO Saving 1 records. Last offset 151629
[org.apache.iceberg.connect.channel.CommitterImpl]
2024-12-16 06:40:15,661 INFO Saving 1 records. Last offset 151630
[org.apache.iceberg.connect.channel.CommitterImpl]
```
So the message is read from the source kafka topic when it should be.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]