anmol commented on code in PR #9641:
URL: https://github.com/apache/iceberg/pull/9641#discussion_r1802251828


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java:
##########
@@ -52,20 +51,22 @@ public IcebergWriter(Table table, String tableName, 
IcebergSinkConfig config) {
 
   private void initNewWriter() {
     this.writer = Utilities.createTableWriter(table, tableName, config);
-    // FIXME: update this when the record converter is added
-    //  this.recordConverter = new RecordConverter(table, config);
+    this.recordConverter = new RecordConverter(table, config);

Review Comment:
   Thanks for the positive responses @bryanck and @ajantha-bhat . Since the 
feature we are discussing is not in this repo yet, what could be a right place 
for the discussion? I think we can join the discussion in this issue - 
https://github.com/tabular-io/iceberg-kafka-connect/issues/303. Please share if 
you have other suggestions. 
   
   Some more background on the issue and the environment:
   
   Goal: Implement CDC pipeline from Oracle RDS to Iceberg S3
   
   Components:
   Source: Oracle RDS with logminer
   Kafka Connect Source Connector: Debezium based (Using MSK Connect platform, 
version 2.7.1 AWS does not support higher atm)
   Kafka Broker: MSK Provisioned ( 3.7.x.kraft)
   Kafka Connect Sink Connector: tabular/iceberg-io/iceberg-kafka-connect based 
(**v0.6.15** -cannot use higher due to KC version 2.7.1 incompatibility)
   
   Current State:
   The Sink Connector is working fine for most part but failed UAT on some 
records not deleted.
   We suspect an edge case in the Worker-Coordinator communication. Following 
is summary of flow - 
   1. Coordinator sends COMMIT_REQUEST C1
   2. C1 timed-out. Coordinator tries partial commit with no data committed. 
   3. Workers handle COMMIT_REQUEST C1. Send COMMIT_RESPONSE and COMMIT_READY 
against C1. During this response insert data files are created.
   4. Coordinator receives COMMIT_RESPONSE and COMMIT_READY against C1. Logs 
that no commit is currently in progress, however commit buffers would be 
updated(cmiiw).
   5. Coordinator sends COMMIT_REQUEST C2
   6. Workers handle COMMIT_REQUEST C2. Send COMMIT_RESPONSE and COMMIT_READY 
against C2. During this response the equality deletes(corresponding to the 
insert data files in C1) are created.
   7. C2 timed-out. Coordinator tries partial commit with all new data 
committed with a single snapshot.
   8. In iceberg the insert data files and the equality delete files are part 
of the same snapshot with same Seq Num. Due to iceberg constraint on the scope 
of equality deletes to be STRICTLY LESS than its Seq Num, the equality delete 
files are ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to