jxwnhj0717 commented on issue #4550:
URL: https://github.com/apache/iceberg/issues/4550#issuecomment-1381352493

   I added logs for locking and releasing locks in 
LockManagers$InMemoryLockManager, reproducing the bad table issue:
   `
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - acquiring 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - acquiring 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - acquired 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - acquired 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - add lock 
heartbeat. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - add lock 
heartbeat. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 
INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - 
releasing lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/22f4f9c5-35d2-467c-8263-d26fd6548017.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 
INFO  org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - 
released lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/22f4f9c5-35d2-467c-8263-d26fd6548017.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674 
INFO  org.apache.iceberg.hadoop.HadoopTableOperations              [] - 
Committed a new metadata file 
hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - releasing 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/ced05720-8f32-49c9-884d-49b604290622.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - released 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/ced05720-8f32-49c9-884d-49b604290622.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO  
org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a 
new metadata file 
hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,679 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - releasing 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - releasing 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO  
org.apache.iceberg.util.LockManagers$InMemoryLockManager     [] - released 
lock. 
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
 
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO  
org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a 
new metadata file 
hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
   [IcebergFilesCommitter -> Sink: IcebergSink 
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,684 WARN  
org.apache.flink.runtime.taskmanager.Task                    [] - 
IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city 
(1/1)#2 (5dec3e28251735766bb3eb423ca5a45c) switched from RUNNING to FAILED with 
failure cause: java.lang.NullPointerException
        at 
org.apache.iceberg.util.LockManagers$InMemoryLockManager.release(LockManagers.java:235)
        at 
org.apache.iceberg.hadoop.HadoopTableOperations.renameToFinal(HadoopTableOperations.java:377)
        at 
org.apache.iceberg.hadoop.HadoopTableOperations.commit(HadoopTableOperations.java:159)
        at 
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:317)
        at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
        at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:295)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:312)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:276)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:218)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:188)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:334)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1171)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:1136)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:1159)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
        at java.lang.Thread.run(Thread.java:748)
   `
   
   The code is as follows:
   `
       @VisibleForTesting
       void acquireOnce(String entityId, String ownerId) {
         InMemoryLockContent content = LOCKS.get(entityId);
         if (content != null && content.expireMs() > 
System.currentTimeMillis()) {
           throw new IllegalStateException(String.format("Lock for %s currently 
held by %s, expiration: %s",
                   entityId, content.ownerId(), content.expireMs()));
         }
   
         LOG.info("acquiring lock. entityId:{} ownerId:{}", entityId, ownerId);
         long expiration = System.currentTimeMillis() + heartbeatTimeoutMs();
         boolean succeed;
         if (content == null) {
           InMemoryLockContent previous = LOCKS.putIfAbsent(
                   entityId, new InMemoryLockContent(ownerId, expiration));
           succeed = previous == null;
         } else {
           succeed = LOCKS.replace(entityId, content, new 
InMemoryLockContent(ownerId, expiration));
         }
   
         if (succeed) {
           LOG.info("acquired lock. entityId:{} ownerId:{}", entityId, ownerId);
           // cleanup old heartbeat
           if (HEARTBEATS.containsKey(entityId)) {
             HEARTBEATS.remove(entityId).cancel(false);
             LOG.info("remove lock heartbeat. entityId:{} ownerId:{}", 
entityId, ownerId);
           }
   
           HEARTBEATS.put(entityId, scheduler().scheduleAtFixedRate(() -> {
             InMemoryLockContent lastContent = LOCKS.get(entityId);
             try {
               long newExpiration = System.currentTimeMillis() + 
heartbeatTimeoutMs();
               LOCKS.replace(entityId, lastContent, new 
InMemoryLockContent(ownerId, newExpiration));
             } catch (NullPointerException e) {
               throw new RuntimeException("Cannot heartbeat to a deleted lock " 
+ entityId, e);
             }
   
           }, 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
           LOG.info("add lock heartbeat. entityId:{} ownerId:{}", entityId, 
ownerId);
   
         } else {
           throw new IllegalStateException("Unable to acquire lock " + 
entityId);
         }
       }
       @Override
       public boolean release(String entityId, String ownerId) {
         LOG.info("releasing lock. entityId:{} ownerId:{}", entityId, ownerId);
         InMemoryLockContent currentContent = LOCKS.get(entityId);
         if (currentContent == null) {
           LOG.error("Cannot find lock for entity {}", entityId);
           return false;
         }
   
         if (!currentContent.ownerId().equals(ownerId)) {
           LOG.error("Cannot unlock {} by {}, current owner: {}", entityId, 
ownerId, currentContent.ownerId());
           return false;
         }
   
         HEARTBEATS.remove(entityId).cancel(false);
         LOCKS.remove(entityId);
         LOG.info("released lock. entityId:{} ownerId:{}", entityId, ownerId);
         return true;
       }
   `
   
   HEARTBEATS.remove(entityId) is empty when the lock is released, strangely, 
2023-01-11 18:46:03, 672 added entityId heartbeart, 2023-01-11 18:46:03, 684 
can not find the corresponding heartbeat, the whole process is single-threaded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to