lucasbru commented on code in PR #17116:
URL: https://github.com/apache/kafka/pull/17116#discussion_r1747481383


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -353,9 +356,11 @@ synchronized boolean lock(final TaskId taskId) {
             if (lockOwner.equals(Thread.currentThread())) {
                 log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
                 // we already own the lock
+                lockedTasksToBackoffRecord.remove(taskId);

Review Comment:
   can we ever have a backoff record in this branch?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -93,7 +93,9 @@ static void registerStateStores(final Logger log,
         }
 
         final TaskId id = stateMgr.taskId();
-        if (!stateDirectory.lock(id)) {
+        if (!stateDirectory.canTryLock(id, System.currentTimeMillis())) {
+            log.debug("Task {} is still not allowed to retry acquiring the 
state directory lock", id);

Review Comment:
   trace level, otherwise we'd be flooded again for debug logs



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -681,4 +701,18 @@ public int hashCode() {
         }
     }
 
+    public static class BackoffRecord {
+        private long attempts;
+        private long lastAttemptMs;
+        private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new 
ExponentialBackoff(1, 2, 1000, 0.5);
+
+
+        public BackoffRecord() {
+            this.attempts = 0;

Review Comment:
   shoudln't this initialize lastAttemptMs as well?
   
   also, you seem to be creating this record on the first attempt. Then 
attempts should be set to 1, no?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -93,7 +93,9 @@ static void registerStateStores(final Logger log,
         }
 
         final TaskId id = stateMgr.taskId();
-        if (!stateDirectory.lock(id)) {
+        if (!stateDirectory.canTryLock(id, System.currentTimeMillis())) {

Review Comment:
   If we enter this branch, the next line that will be printed is "acquired 
state directory lock". But this is not what we want right? Don't we have to 
leave this method somehow?
   
   We should also test this behavior in `StateManagerUtilTest`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -365,10 +370,25 @@ synchronized boolean lock(final TaskId taskId) {
             lockedTasksToOwner.put(taskId, Thread.currentThread());
             // make sure the task directory actually exists, and create it if 
not
             getOrCreateDirectoryForTask(taskId);
+            lockedTasksToBackoffRecord.remove(taskId);
             return true;
         }
     }
 
+    public boolean canTryLock(final TaskId taskId, final long nowMs) {
+        return !lockedTasksToBackoffRecord.containsKey(taskId) || 
lockedTasksToBackoffRecord.get(taskId).canAttempt(nowMs);
+    }
+
+    private void updateOrCreateBackoffRecord(final TaskId taskId, final long 
nowMs) {
+        if (lockedTasksToBackoffRecord.containsKey(taskId)) {
+            final BackoffRecord backoffRecord = 
lockedTasksToBackoffRecord.get(taskId);
+            backoffRecord.lastAttemptMs = nowMs;

Review Comment:
   nit: maybe we could move the update of the fields also into a method of 
BackoffRecord, something like backoffRecord.recordAttempt(nowMs); ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -681,4 +701,18 @@ public int hashCode() {
         }
     }
 
+    public static class BackoffRecord {
+        private long attempts;
+        private long lastAttemptMs;
+        private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new 
ExponentialBackoff(1, 2, 1000, 0.5);

Review Comment:
   Agreed, once per second still seems like a lot. We could start with 10 
seconds, and see if we want to increase it. wdyt, @cadonna ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to