jxwnhj0717 commented on issue #4550: URL: https://github.com/apache/iceberg/issues/4550#issuecomment-1381352493
I added logs for locking and releasing locks in LockManagers$InMemoryLockManager, reproducing the bad table issue: ` [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - acquiring lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - acquiring lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - acquired lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - acquired lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - add lock heartbeat. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - add lock heartbeat. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - releasing lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/22f4f9c5-35d2-467c-8263-d26fd6548017.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - released lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/22f4f9c5-35d2-467c-8263-d26fd6548017.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 INFO org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a new metadata file hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - releasing lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/ced05720-8f32-49c9-884d-49b604290622.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - released lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/ced05720-8f32-49c9-884d-49b604290622.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a new metadata file hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,679 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - releasing lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - releasing lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - released lock. entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a new metadata file hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json [IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,684 WARN org.apache.flink.runtime.taskmanager.Task [] - IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city (1/1)#2 (5dec3e28251735766bb3eb423ca5a45c) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException at org.apache.iceberg.util.LockManagers$InMemoryLockManager.release(LockManagers.java:235) at org.apache.iceberg.hadoop.HadoopTableOperations.renameToFinal(HadoopTableOperations.java:377) at org.apache.iceberg.hadoop.HadoopTableOperations.commit(HadoopTableOperations.java:159) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:317) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:295) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:312) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:276) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:218) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:188) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:334) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1171) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:1136) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:1159) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) ` The code is as follows: ` @VisibleForTesting void acquireOnce(String entityId, String ownerId) { InMemoryLockContent content = LOCKS.get(entityId); if (content != null && content.expireMs() > System.currentTimeMillis()) { throw new IllegalStateException(String.format("Lock for %s currently held by %s, expiration: %s", entityId, content.ownerId(), content.expireMs())); } LOG.info("acquiring lock. entityId:{} ownerId:{}", entityId, ownerId); long expiration = System.currentTimeMillis() + heartbeatTimeoutMs(); boolean succeed; if (content == null) { InMemoryLockContent previous = LOCKS.putIfAbsent( entityId, new InMemoryLockContent(ownerId, expiration)); succeed = previous == null; } else { succeed = LOCKS.replace(entityId, content, new InMemoryLockContent(ownerId, expiration)); } if (succeed) { LOG.info("acquired lock. entityId:{} ownerId:{}", entityId, ownerId); // cleanup old heartbeat if (HEARTBEATS.containsKey(entityId)) { HEARTBEATS.remove(entityId).cancel(false); LOG.info("remove lock heartbeat. entityId:{} ownerId:{}", entityId, ownerId); } HEARTBEATS.put(entityId, scheduler().scheduleAtFixedRate(() -> { InMemoryLockContent lastContent = LOCKS.get(entityId); try { long newExpiration = System.currentTimeMillis() + heartbeatTimeoutMs(); LOCKS.replace(entityId, lastContent, new InMemoryLockContent(ownerId, newExpiration)); } catch (NullPointerException e) { throw new RuntimeException("Cannot heartbeat to a deleted lock " + entityId, e); } }, 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS)); LOG.info("add lock heartbeat. entityId:{} ownerId:{}", entityId, ownerId); } else { throw new IllegalStateException("Unable to acquire lock " + entityId); } } @Override public boolean release(String entityId, String ownerId) { LOG.info("releasing lock. entityId:{} ownerId:{}", entityId, ownerId); InMemoryLockContent currentContent = LOCKS.get(entityId); if (currentContent == null) { LOG.error("Cannot find lock for entity {}", entityId); return false; } if (!currentContent.ownerId().equals(ownerId)) { LOG.error("Cannot unlock {} by {}, current owner: {}", entityId, ownerId, currentContent.ownerId()); return false; } HEARTBEATS.remove(entityId).cancel(false); LOCKS.remove(entityId); LOG.info("released lock. entityId:{} ownerId:{}", entityId, ownerId); return true; } ` HEARTBEATS.remove(entityId) is empty when the lock is released, strangely, 2023-01-11 18:46:03, 672 added entityId heartbeart, 2023-01-11 18:46:03, 684 can not find the corresponding heartbeat, the whole process is single-threaded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org