anmol commented on code in PR #9641: URL: https://github.com/apache/iceberg/pull/9641#discussion_r1802251828
########## kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java: ########## @@ -52,20 +51,22 @@ public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { private void initNewWriter() { this.writer = Utilities.createTableWriter(table, tableName, config); - // FIXME: update this when the record converter is added - // this.recordConverter = new RecordConverter(table, config); + this.recordConverter = new RecordConverter(table, config); Review Comment: Thanks for the positive responses @bryanck and @ajantha-bhat . Since the feature we are discussing is not in this repo yet, what could be a right place for the discussion? I think we can join the discussion in this issue - https://github.com/tabular-io/iceberg-kafka-connect/issues/303. Please share if you have other suggestions. Some more background on the issue and the environment: Goal: Implement CDC pipeline from Oracle RDS to Iceberg S3 Components: Source: Oracle RDS with logminer Kafka Connect Source Connector: Debezium based (Using MSK Connect platform, version 2.7.1 AWS does not support higher atm) Kafka Broker: MSK Provisioned ( 3.7.x.kraft) Kafka Connect Sink Connector: tabular/iceberg-io/iceberg-kafka-connect based (**v0.6.15** -cannot use higher due to KC version 2.7.1 incompatibility) Current State: The Sink Connector is working fine for most part but failed UAT on some records not deleted. We suspect an edge case in the Worker-Coordinator communication. Following is summary of flow - 1. Coordinator sends COMMIT_REQUEST C1 2. C1 timed-out. Coordinator tries partial commit with no data committed. 3. Workers handle COMMIT_REQUEST C1. Send COMMIT_RESPONSE and COMMIT_READY against C1. During this response insert data files are created. 4. Coordinator receives COMMIT_RESPONSE and COMMIT_READY against C1. Logs that no commit is currently in progress, however commit buffers would be updated(cmiiw). 5. Coordinator sends COMMIT_REQUEST C2 6. Workers handle COMMIT_REQUEST C2. Send COMMIT_RESPONSE and COMMIT_READY against C2. During this response the equality deletes(corresponding to the insert data files in C1) are created. 7. C2 timed-out. Coordinator tries partial commit with all new data committed with a single snapshot. 8. In iceberg the insert data files and the equality delete files are part of the same snapshot with same Seq Num. Due to iceberg constraint on the scope of equality deletes to be STRICTLY LESS than its Seq Num, the equality delete files are ignored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org