lucasbru commented on code in PR #17116:
URL: https://github.com/apache/kafka/pull/17116#discussion_r1747481383
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -353,9 +356,11 @@ synchronized boolean lock(final TaskId taskId) {
if (lockOwner.equals(Thread.currentThread())) {
log.trace("{} Found cached state dir lock for task {}",
logPrefix(), taskId);
// we already own the lock
+ lockedTasksToBackoffRecord.remove(taskId);
Review Comment:
can we ever have a backoff record in this branch?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -93,7 +93,9 @@ static void registerStateStores(final Logger log,
}
final TaskId id = stateMgr.taskId();
- if (!stateDirectory.lock(id)) {
+ if (!stateDirectory.canTryLock(id, System.currentTimeMillis())) {
+ log.debug("Task {} is still not allowed to retry acquiring the
state directory lock", id);
Review Comment:
trace level, otherwise we'd be flooded again for debug logs
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -681,4 +701,18 @@ public int hashCode() {
}
}
+ public static class BackoffRecord {
+ private long attempts;
+ private long lastAttemptMs;
+ private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new
ExponentialBackoff(1, 2, 1000, 0.5);
+
+
+ public BackoffRecord() {
+ this.attempts = 0;
Review Comment:
shoudln't this initialize lastAttemptMs as well?
also, you seem to be creating this record on the first attempt. Then
attempts should be set to 1, no?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -93,7 +93,9 @@ static void registerStateStores(final Logger log,
}
final TaskId id = stateMgr.taskId();
- if (!stateDirectory.lock(id)) {
+ if (!stateDirectory.canTryLock(id, System.currentTimeMillis())) {
Review Comment:
If we enter this branch, the next line that will be printed is "acquired
state directory lock". But this is not what we want right? Don't we have to
leave this method somehow?
We should also test this behavior in `StateManagerUtilTest`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -365,10 +370,25 @@ synchronized boolean lock(final TaskId taskId) {
lockedTasksToOwner.put(taskId, Thread.currentThread());
// make sure the task directory actually exists, and create it if
not
getOrCreateDirectoryForTask(taskId);
+ lockedTasksToBackoffRecord.remove(taskId);
return true;
}
}
+ public boolean canTryLock(final TaskId taskId, final long nowMs) {
+ return !lockedTasksToBackoffRecord.containsKey(taskId) ||
lockedTasksToBackoffRecord.get(taskId).canAttempt(nowMs);
+ }
+
+ private void updateOrCreateBackoffRecord(final TaskId taskId, final long
nowMs) {
+ if (lockedTasksToBackoffRecord.containsKey(taskId)) {
+ final BackoffRecord backoffRecord =
lockedTasksToBackoffRecord.get(taskId);
+ backoffRecord.lastAttemptMs = nowMs;
Review Comment:
nit: maybe we could move the update of the fields also into a method of
BackoffRecord, something like backoffRecord.recordAttempt(nowMs); ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -681,4 +701,18 @@ public int hashCode() {
}
}
+ public static class BackoffRecord {
+ private long attempts;
+ private long lastAttemptMs;
+ private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new
ExponentialBackoff(1, 2, 1000, 0.5);
Review Comment:
Agreed, once per second still seems like a lot. We could start with 10
seconds, and see if we want to increase it. wdyt, @cadonna ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]