rmoff opened a new issue, #13457: URL: https://github.com/apache/iceberg/issues/13457
### Apache Iceberg version None ### Query engine None ### Please describe the bug ๐ ## Summary If I use ``` "iceberg.tables":"tmp.static_orders_json", ``` The table is written and committed to fine, and I can read the data If I use ``` "iceberg.tables.dynamic-enabled": "true", "iceberg.tables.route-field":"srcTopic", "transforms" : "addDbPrefix, insertTopic", "transforms.addDbPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.addDbPrefix.regex" : ".*", "transforms.addDbPrefix.replacement" : "tmp.dynamic_\$0", "transforms.insertTopic.type" : "org.apache.kafka.connect.transforms.InsertField\$Value", "transforms.insertTopic.topic.field" : "srcTopic" ``` files are written to S3, but no snapshot and thus no rows are returned when querying the data. The intention here is to be able to route multiple topics to multiple tables, using the topic name as the basis for the target table. ## Steps to reproduce I'm using Kafka Connect + Kafka running locally, with AWS Glue catalog and AWS S3 for storage. No existing tables in the Glue database, or files on S3: ``` $ aws s3 --recursive ls s3://rmoff-lakehouse/00 $ aws glue get-tables --database-name tmp --query 'TableList[].Name' --output table --region us-east-1 ```` Populate a new Kafka topic: ``` echo '{"order_id": "001", "customer_id": "cust_123", "product": "laptop", "quantity": 1, "price": 999.99} {"order_id": "002", "customer_id": "cust_456", "product": "mouse", "quantity": 2, "price": 25.50} {"order_id": "003", "customer_id": "cust_789", "product": "keyboard", "quantity": 1, "price": 75.00} {"order_id": "004", "customer_id": "cust_321", "product": "monitor", "quantity": 1, "price": 299.99} {"order_id": "005", "customer_id": "cust_654", "product": "headphones", "quantity": 1, "price": 149.99}' | docker compose exec -T kcat kcat -P -b broker:9092 -t orders_json ``` Confirm that the data is there ``` docker compose exec -it kcat kcat -b broker:9092 -C -t orders_json {"order_id": "001", "customer_id": "cust_123", "product": "laptop", "quantity": 1, "price": 999.99} {"order_id": "002", "customer_id": "cust_456", "product": "mouse", "quantity": 2, "price": 25.50} {"order_id": "003", "customer_id": "cust_789", "product": "keyboard", "quantity": 1, "price": 75.00} {"order_id": "004", "customer_id": "cust_321", "product": "monitor", "quantity": 1, "price": 299.99} {"order_id": "005", "customer_id": "cust_654", "product": "headphones", "quantity": 1, "price": 149.99} ``` Create a static connector, reading from a fixed topic writing to a fixed table ``` kcctl apply -f - <<EOF { "name": "iceberg-sink-static", "config": { "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", "topics.regex": "orders_json", "iceberg.tables":"tmp.static_orders_json", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.evolve-schema-enabled": "true", "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.catalog.warehouse": "s3://rmoff-lakehouse/00/", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "iceberg.control.commit.interval-ms": "1000" } } EOF ``` Confirm connector is running: ``` kcctl describe connector iceberg-sink-static Name: iceberg-sink-static Type: sink State: RUNNING Worker ID: kafka-connect:8083 Config: connector.class: io.tabular.iceberg.connect.IcebergSinkConnector iceberg.catalog.catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO iceberg.catalog.warehouse: s3://rmoff-lakehouse/00/ iceberg.control.commit.interval-ms: 1000 iceberg.tables: tmp.static_orders_json iceberg.tables.auto-create-enabled: true iceberg.tables.evolve-schema-enabled: true key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false name: iceberg-sink-static topics.regex: orders_json value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: false Tasks: 0: State: RUNNING Worker ID: kafka-connect:8083 Topics: orders_json ``` Confirm table is present: ``` โฏ aws glue get-tables --database-name tmp --query 'TableList[].Name' --output table --region us-east-1 ------------------------ | GetTables | +----------------------+ | static_orders_json | +----------------------+ ``` Check S3 files: ``` โฏ aws s3 --recursive ls s3://rmoff-lakehouse/00 2025-07-03 17:34:21 1635 00/tmp.db/static_orders_json/data/00001-1751560405549-e180131f-fd33-46db-af68-6cdadd0f360f-00001.parquet 2025-07-03 17:33:25 1327 00/tmp.db/static_orders_json/metadata/00000-27c1ccf9-c5cc-436c-9246-fe6bd9d0041f.metadata.json 2025-07-03 17:34:23 2550 00/tmp.db/static_orders_json/metadata/00001-53d33894-c660-42b2-8eac-5c695b9edada.metadata.json 2025-07-03 17:34:22 6961 00/tmp.db/static_orders_json/metadata/18cfb817-5b80-4ae7-bd99-a927e96d64ae-m0.avro 2025-07-03 17:34:22 4242 00/tmp.db/static_orders_json/metadata/snap-6206227594069763789-1-18cfb817-5b80-4ae7-bd99-a927e96d64ae.avro ``` Check the data with DuckDB: ``` ๐กโ CREATE SECRET iceberg_secret ( TYPE S3, PROVIDER credential_chain ); โโโโโโโโโโโ โ Success โ โ boolean โ โโโโโโโโโโโค โ true โ โโโโโโโโโโโ Run Time (s): real 0.025 user 0.140766 sys 0.020710 ๐กโ ATTACH '052821163812' AS glue_catalog ( TYPE iceberg, ENDPOINT_TYPE glue); Run Time (s): real 0.487 user 0.172899 sys 0.030617 ๐กโ USE glue_catalog.tmp; Run Time (s): real 0.783 user 0.789097 sys 0.003318 ๐กโ SELECT * FROM static_orders_json; โโโโโโโโโโโโโโฌโโโโโโโโโโโฌโโโโโโโโโฌโโโโโโโโโโโโโโฌโโโโโโโโโโโ โ product โ quantity โ price โ customer_id โ order_id โ โ varchar โ int64 โ double โ varchar โ varchar โ โโโโโโโโโโโโโโผโโโโโโโโโโโผโโโโโโโโโผโโโโโโโโโโโโโโผโโโโโโโโโโโค โ laptop โ 1 โ 999.99 โ cust_123 โ 001 โ โ mouse โ 2 โ 25.5 โ cust_456 โ 002 โ โ keyboard โ 1 โ 75.0 โ cust_789 โ 003 โ โ monitor โ 1 โ 299.99 โ cust_321 โ 004 โ โ headphones โ 1 โ 149.99 โ cust_654 โ 005 โ โโโโโโโโโโโโโโดโโโโโโโโโโโดโโโโโโโโโดโโโโโโโโโโโโโโดโโโโโโโโโโโ Run Time (s): real 2.655 user 0.164638 sys 0.054792 ๐กโ ``` Now try to do the same thing, but using dynamic table routing (on one table for now, but the purpose is to work with many) ``` kcctl apply -f - <<EOF { "name": "iceberg-sink-dynamic", "config": { "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", "topics": "orders_json", "iceberg.tables.dynamic-enabled": "true", "iceberg.tables.route-field":"srcTopic", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.evolve-schema-enabled": "true", "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.catalog.warehouse": "s3://rmoff-lakehouse/00/", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "iceberg.control.commit.interval-ms": "1000", "transforms" : "addDbPrefix, insertTopic", "transforms.addDbPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.addDbPrefix.regex" : ".*", "transforms.addDbPrefix.replacement" : "tmp.dynamic_\$0", "transforms.insertTopic.type" : "org.apache.kafka.connect.transforms.InsertField\$Value", "transforms.insertTopic.topic.field" : "srcTopic" } } EOF ``` Confirm connector is running ``` kcctl describe connector iceberg-sink-dynamic Name: iceberg-sink-dynamic Type: sink State: RUNNING Worker ID: kafka-connect:8083 Config: connector.class: io.tabular.iceberg.connect.IcebergSinkConnector iceberg.catalog.catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO iceberg.catalog.warehouse: s3://rmoff-lakehouse/00/ iceberg.control.commit.interval-ms: 1000 iceberg.tables.auto-create-enabled: true iceberg.tables.dynamic-enabled: true iceberg.tables.evolve-schema-enabled: true iceberg.tables.route-field: srcTopic key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false name: iceberg-sink-dynamic topics: orders_json transforms: addDbPrefix, insertTopic transforms.addDbPrefix.regex: .* transforms.addDbPrefix.replacement: tmp.dynamic_$0 transforms.addDbPrefix.type: org.apache.kafka.connect.transforms.RegexRouter transforms.insertTopic.topic.field: srcTopic transforms.insertTopic.type: org.apache.kafka.connect.transforms.InsertField$Value value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: false Tasks: 0: State: RUNNING Worker ID: kafka-connect:8083 Topics: orders_json ``` Confirm new table: ``` โฏ aws glue get-tables --database-name tmp --query 'TableList[].Name' --output table --region us-east-1 ------------------------- | GetTables | +-----------------------+ | dynamic_orders_json | | static_orders_json | +-----------------------+ ``` S3 - note no snap files ``` โฏ aws s3 --recursive ls s3://rmoff-lakehouse/00 2025-07-03 17:39:04 2058 00/tmp.db/dynamic_orders_json/data/00001-1751560743209-4cbce9d3-4021-445c-858c-894b727ff56d-00001.parquet 2025-07-03 17:39:03 1430 00/tmp.db/dynamic_orders_json/metadata/00000-3cf5ad66-e360-498c-a9c6-1127f6f7387b.metadata.json 2025-07-03 17:34:21 1635 00/tmp.db/static_orders_json/data/00001-1751560405549-e180131f-fd33-46db-af68-6cdadd0f360f-00001.parquet 2025-07-03 17:33:25 1327 00/tmp.db/static_orders_json/metadata/00000-27c1ccf9-c5cc-436c-9246-fe6bd9d0041f.metadata.json 2025-07-03 17:34:23 2550 00/tmp.db/static_orders_json/metadata/00001-53d33894-c660-42b2-8eac-5c695b9edada.metadata.json 2025-07-03 17:34:22 6961 00/tmp.db/static_orders_json/metadata/18cfb817-5b80-4ae7-bd99-a927e96d64ae-m0.avro 2025-07-03 17:34:22 4242 00/tmp.db/static_orders_json/metadata/snap-6206227594069763789-1-18cfb817-5b80-4ae7-bd99-a927e96d64ae.avro ``` No rows returned from querying: ``` ๐กโ SELECT * FROM dynamic_orders_json; โโโโโโโโโโโฌโโโโโโโโโโโฌโโโโโโโโโโโฌโโโโโโโโโโโโโโฌโโโโโโโโโโโฌโโโโโโโโโ โ product โ srctopic โ quantity โ customer_id โ order_id โ price โ โ varchar โ varchar โ int64 โ varchar โ varchar โ double โ โโโโโโโโโโโดโโโโโโโโโโโดโโโโโโโโโโโดโโโโโโโโโโโโโโดโโโโโโโโโโโดโโโโโโโโโค โ 0 rows โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ Run Time (s): real 1.490 user 0.069060 sys 0.012978 ๐กโ ``` ## Logs ### Static connector ``` [2025-07-03 16:33:25,142] INFO [iceberg-sink-static|task-0] Successfully committed to table iceberg.tmp.static_orders_json in 1180 ms (org.apache.iceberg.BaseMetastoreTableOperations:139) [2025-07-03 16:33:25,146] INFO [iceberg-sink-static|task-0] Created new table tmp.static_orders_json from record at topic: orders_json, partition: 0, offset: 0 (io.tabular.iceberg.connect.data.IcebergWriterFactory:109) [2025-07-03 16:33:25,405] INFO [iceberg-sink-static|task-0] Refreshing table metadata from new version: s3://rmoff-lakehouse/00/tmp.db/static_orders_json/metadata/00000-27c1ccf9-c5cc-436c-9246-fe6bd9d0041f.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199) [2025-07-03 16:33:25,584] INFO [iceberg-sink-static|task-0] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153) [2025-07-03 16:33:52,741] INFO Commit timeout reached. Now: 1751560432740, start: 1751560402538, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState:111) [2025-07-03 16:33:52,741] INFO Processing commit after responses for ac5561a3-1eff-44d7-81aa-89ec6f1cf9e4, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:33:52,784] INFO Commit ac5561a3-1eff-44d7-81aa-89ec6f1cf9e4 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:33:52,785] INFO Started new commit with commit-id=bcac4ac8-8b2d-41f5-a9b2-99102859aaa3 (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:33:52,816] INFO Sent workers commit trigger with commit-id=bcac4ac8-8b2d-41f5-a9b2-99102859aaa3 (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:34:20,284] INFO [Producer clientId=b0614ac5-e8f7-4d8e-b86f-a10d4b8fa878, transactionalId=committer-txn-88f4a1e2-30ec-43c4-b925-8664bfd1b2b2-0] Discovered group coordinator broker:9092 (id: 1 rack: null isFenced: false) (org.apache.kafka.clients.producer.internals.TransactionManager:1633) [2025-07-03 16:34:20,424] INFO Commit bcac4ac8-8b2d-41f5-a9b2-99102859aaa3 not ready, received responses for 0 of 3 partitions, waiting for more (io.tabular.iceberg.connect.channel.CommitState:136) [2025-07-03 16:34:20,533] INFO Commit bcac4ac8-8b2d-41f5-a9b2-99102859aaa3 ready, received responses for all 3 partitions (io.tabular.iceberg.connect.channel.CommitState:129) [2025-07-03 16:34:20,533] INFO Processing commit after responses for bcac4ac8-8b2d-41f5-a9b2-99102859aaa3, isPartialCommit false (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:34:20,535] INFO Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:327) [2025-07-03 16:34:20,785] INFO Refreshing table metadata from new version: s3://rmoff-lakehouse/00/tmp.db/static_orders_json/metadata/00000-27c1ccf9-c5cc-436c-9246-fe6bd9d0041f.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199) [2025-07-03 16:34:21,156] INFO Table loaded by catalog: iceberg.tmp.static_orders_json (org.apache.iceberg.BaseMetastoreCatalog:69) [2025-07-03 16:34:22,087] INFO Committed snapshot 6206227594069763789 (MergeAppend) (org.apache.iceberg.SnapshotProducer:422) [2025-07-03 16:34:22,350] INFO Received metrics report: CommitReport{tableName=iceberg.tmp.static_orders_json, snapshotId=6206227594069763789, sequenceNumber=1, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.871066375S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1} [2025-07-03 16:34:23,328] INFO Successfully committed to table iceberg.tmp.static_orders_json in 748 ms (org.apache.iceberg.BaseMetastoreTableOperations:139) [2025-07-03 16:34:23,568] INFO Refreshing table metadata from new version: s3://rmoff-lakehouse/00/tmp.db/static_orders_json/metadata/00001-53d33894-c660-42b2-8eac-5c695b9edada.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199) [2025-07-03 16:34:23,982] INFO Commit complete to table tmp.static_orders_json, snapshot 6206227594069763789, commit ID bcac4ac8-8b2d-41f5-a9b2-99102859aaa3, vtts null (io.tabular.iceberg.connect.channel.Coordinator:270) ``` ### Dynamic connector ``` [2025-07-03 16:39:02,869] INFO [iceberg-sink-dynamic|task-0] Successfully committed to table iceberg.tmp.dynamic_orders_json in 1050 ms (org.apache.iceberg.BaseMetastoreTableOperations:139) [2025-07-03 16:39:02,870] INFO [iceberg-sink-dynamic|task-0] Created new table tmp.dynamic_orders_json from record at topic: tmp.dynamic_orders_json, partition: 0, offset: 0 (io.tabular.iceberg.connect.data.IcebergWriterFactory:109) [2025-07-03 16:39:03,086] INFO [iceberg-sink-dynamic|task-0] Refreshing table metadata from new version: s3://rmoff-lakehouse/00/tmp.db/dynamic_orders_json/metadata/00000-3cf5ad66-e360-498c-a9c6-1127f6f7387b.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199) [2025-07-03 16:39:03,215] INFO [iceberg-sink-dynamic|task-0] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153) [2025-07-03 16:39:03,477] INFO [Producer clientId=83a6493d-860a-471a-8306-332d1d2b3bb8, transactionalId=committer-txn-361e28ec-a644-4ee4-9f37-8b6e3f2f1223-0] Discovered group coordinator broker:9092 (id: 1 rack: null isFenced: false) (org.apache.kafka.clients.producer.internals.TransactionManager:1633) [2025-07-03 16:39:22,743] INFO Commit timeout reached. Now: 1751560762743, start: 1751560731822, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState:111) [2025-07-03 16:39:22,743] INFO Processing commit after responses for 1023c504-2eab-4198-ae4e-fa0e02b9d1f7, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:39:22,761] INFO Commit 1023c504-2eab-4198-ae4e-fa0e02b9d1f7 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:39:22,761] INFO Started new commit with commit-id=895ebff8-db44-4262-ab26-b3fa40ef2ee4 (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:39:22,788] INFO Sent workers commit trigger with commit-id=895ebff8-db44-4262-ab26-b3fa40ef2ee4 (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:39:30,816] INFO Commit timeout reached. Now: 1751560770815, start: 1751560740666, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState:111) [2025-07-03 16:39:30,816] INFO Processing commit after responses for 23ce260c-b301-4662-9808-8e0eaf35d142, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:39:30,836] INFO Commit 23ce260c-b301-4662-9808-8e0eaf35d142 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:39:30,836] INFO Started new commit with commit-id=6a2bf54a-b138-41d3-a272-1f17642bd7f6 (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:39:30,864] INFO Sent workers commit trigger with commit-id=6a2bf54a-b138-41d3-a272-1f17642bd7f6 (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:39:52,905] INFO Commit timeout reached. Now: 1751560792904, start: 1751560762761, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState:111) [2025-07-03 16:39:52,905] INFO Processing commit after responses for 895ebff8-db44-4262-ab26-b3fa40ef2ee4, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:39:52,924] INFO Commit 895ebff8-db44-4262-ab26-b3fa40ef2ee4 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:39:52,925] INFO Started new commit with commit-id=2d783e91-9a8f-4888-a96e-7a3e0c066a34 (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:39:52,955] INFO Sent workers commit trigger with commit-id=2d783e91-9a8f-4888-a96e-7a3e0c066a34 (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:40:00,971] INFO Commit timeout reached. Now: 1751560800971, start: 1751560770836, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState:111) [2025-07-03 16:40:00,971] INFO Processing commit after responses for 6a2bf54a-b138-41d3-a272-1f17642bd7f6, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:40:00,983] INFO Commit 6a2bf54a-b138-41d3-a272-1f17642bd7f6 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:40:00,983] INFO Started new commit with commit-id=c07ff988-75f1-48c6-9767-901dc531749a (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:40:01,010] INFO Sent workers commit trigger with commit-id=c07ff988-75f1-48c6-9767-901dc531749a (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:40:03,480] INFO [iceberg-sink-dynamic|task-0] [Producer clientId=83a6493d-860a-471a-8306-332d1d2b3bb8, transactionalId=committer-txn-361e28ec-a644-4ee4-9f37-8b6e3f2f1223-0] Aborting incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer:811) [2025-07-03 16:40:20,718] INFO Commit 2d783e91-9a8f-4888-a96e-7a3e0c066a34 not ready, received responses for 0 of 3 partitions, waiting for more (io.tabular.iceberg.connect.channel.CommitState:136) [2025-07-03 16:40:20,723] INFO Commit 2d783e91-9a8f-4888-a96e-7a3e0c066a34 not ready, received responses for 0 of 3 partitions, waiting for more (io.tabular.iceberg.connect.channel.CommitState:136) [2025-07-03 16:40:20,734] INFO Commit 2d783e91-9a8f-4888-a96e-7a3e0c066a34 not ready, received responses for 0 of 3 partitions, waiting for more (io.tabular.iceberg.connect.channel.CommitState:136) [2025-07-03 16:40:20,738] INFO Commit 2d783e91-9a8f-4888-a96e-7a3e0c066a34 ready, received responses for all 3 partitions (io.tabular.iceberg.connect.channel.CommitState:129) [2025-07-03 16:40:20,738] INFO Processing commit after responses for 2d783e91-9a8f-4888-a96e-7a3e0c066a34, isPartialCommit false (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:40:20,752] INFO Commit 2d783e91-9a8f-4888-a96e-7a3e0c066a34 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:40:21,757] INFO Started new commit with commit-id=142d30f9-b010-4f6f-9990-e867118f06b0 (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:40:21,763] INFO Sent workers commit trigger with commit-id=142d30f9-b010-4f6f-9990-e867118f06b0 (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:40:31,794] INFO Commit timeout reached. Now: 1751560831794, start: 1751560800983, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState:111) [2025-07-03 16:40:31,794] INFO Processing commit after responses for c07ff988-75f1-48c6-9767-901dc531749a, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:40:31,809] INFO Commit c07ff988-75f1-48c6-9767-901dc531749a complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:40:31,809] INFO Started new commit with commit-id=435e5dcc-d0cb-4d20-a033-8a7329a07f9a (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:40:31,814] INFO Sent workers commit trigger with commit-id=435e5dcc-d0cb-4d20-a033-8a7329a07f9a (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:40:51,857] INFO Commit timeout reached. Now: 1751560851857, start: 1751560821757, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState:111) [2025-07-03 16:40:51,857] INFO Processing commit after responses for 142d30f9-b010-4f6f-9990-e867118f06b0, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:40:51,871] INFO Commit 142d30f9-b010-4f6f-9990-e867118f06b0 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:40:51,871] INFO Started new commit with commit-id=ca44186c-36ab-4064-b258-799b431470d2 (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:40:51,901] INFO Sent workers commit trigger with commit-id=ca44186c-36ab-4064-b258-799b431470d2 (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:41:01,922] INFO Commit timeout reached. Now: 1751560861922, start: 1751560831809, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState:111) [2025-07-03 16:41:01,923] INFO Processing commit after responses for 435e5dcc-d0cb-4d20-a033-8a7329a07f9a, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator:132) [2025-07-03 16:41:01,947] INFO Commit 435e5dcc-d0cb-4d20-a033-8a7329a07f9a complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:163) [2025-07-03 16:41:01,947] INFO Started new commit with commit-id=605f92b4-f3b5-4a48-9263-6c7d88e2f1e9 (io.tabular.iceberg.connect.channel.Coordinator:100) [2025-07-03 16:41:01,953] INFO Sent workers commit trigger with commit-id=605f92b4-f3b5-4a48-9263-6c7d88e2f1e9 (io.tabular.iceberg.connect.channel.Coordinator:104) [2025-07-03 16:41:03,481] WARN [iceberg-sink-dynamic|task-0] Error aborting producer transaction (io.tabular.iceberg.connect.channel.Channel:115) org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting EndTxn(false) [2025-07-03 16:41:03,485] ERROR [iceberg-sink-dynamic|task-0] WorkerSinkTask{id=iceberg-sink-dynamic-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Timeout expired after 60000ms while awaiting TxnOffsetCommitHandler (org.apache.kafka.connect.runtime.WorkerSinkTask:634) org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting TxnOffsetCommitHandler [2025-07-03 16:41:03,485] INFO [iceberg-sink-dynamic|task-0] Channel stopping (io.tabular.iceberg.connect.channel.Channel:163) ``` ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org