lucasbru commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2303883693


##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -112,6 +131,33 @@ class DefaultAutoTopicCreationManager(
     }
   }
 
+  override def getTopicCreationErrors(
+    topicNames: Set[String]
+  ): Map[String, String] = {
+    val currentTime = time.milliseconds()
+    val errors = mutable.Map.empty[String, String]
+    val expiredKeys = mutable.Set.empty[String]
+    
+    // Check requested topics and collect expired keys
+    topicNames.foreach { topicName =>
+      Option(topicCreationErrorCache.get(topicName)) match {
+        case Some(cachedError) if (currentTime - cachedError.timestamp) <= 
errorCacheTtlMs =>
+          errors.put(topicName, cachedError.errorMessage)
+        case Some(_) =>
+          expiredKeys += topicName
+        case None =>
+      }
+    }
+    
+    // Remove expired entries
+    expiredKeys.foreach { key =>
+      topicCreationErrorCache.remove(key)
+      debug(s"Removed expired topic creation error cache entry for $key")
+    }
+    
+    errors.toMap
+  }
+
   private def sendCreateTopicRequest(

Review Comment:
   `sendCreateTopicRequest` is used by both streams and other internal topics. 
I would suggest making a separate copy for this method to not affect the code 
path for other internal topics (since the error message will not be used on 
those code paths.)



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -112,6 +131,33 @@ class DefaultAutoTopicCreationManager(
     }
   }
 
+  override def getTopicCreationErrors(
+    topicNames: Set[String]
+  ): Map[String, String] = {
+    val currentTime = time.milliseconds()
+    val errors = mutable.Map.empty[String, String]
+    val expiredKeys = mutable.Set.empty[String]
+    
+    // Check requested topics and collect expired keys
+    topicNames.foreach { topicName =>
+      Option(topicCreationErrorCache.get(topicName)) match {
+        case Some(cachedError) if (currentTime - cachedError.timestamp) <= 
errorCacheTtlMs =>
+          errors.put(topicName, cachedError.errorMessage)
+        case Some(_) =>
+          expiredKeys += topicName
+        case None =>
+      }
+    }
+    
+    // Remove expired entries
+    expiredKeys.foreach { key =>
+      topicCreationErrorCache.remove(key)
+      debug(s"Removed expired topic creation error cache entry for $key")
+    }
+    

Review Comment:
   I think we still need to make sure that eventually, errors are evicted even 
if we do not receive another topic creation request. I wonder if we shouldn't 
build an expiring cache for that implements 
`org.apache.kafka.common.cache.Cache`. It could use a LinkedHashMap similar 
LRUCache and every time we insert or get from the cache, we try to expire the 
last elements of the LinkedHashMap. WDYT?



##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -53,17 +55,34 @@ trait AutoTopicCreationManager {
     requestContext: RequestContext
   ): Unit
 
+  def getTopicCreationErrors(
+    topicNames: Set[String]
+  ): Map[String, String]
+
+  def close(): Unit = {}
+
+}
+
+case class CachedTopicCreationError(
+  errorMessage: String,
+  time: Time
+) {
+  val timestamp: Long = time.milliseconds()
 }
 
 class DefaultAutoTopicCreationManager(
   config: KafkaConfig,
   channelManager: NodeToControllerChannelManager,
   groupCoordinator: GroupCoordinator,
   txnCoordinator: TransactionCoordinator,
-  shareCoordinator: ShareCoordinator
+  shareCoordinator: ShareCoordinator,
+  time: Time
 ) extends AutoTopicCreationManager with Logging {
 
   private val inflightTopics = Collections.newSetFromMap(new 
ConcurrentHashMap[String, java.lang.Boolean]())
+  private val topicCreationErrorCache = new ConcurrentHashMap[String, 
CachedTopicCreationError]()
+  // Use session timeout for better semantic alignment with client lifecycle
+  private val errorCacheTtlMs = 
config.groupCoordinatorConfig.classicGroupMaxSessionTimeoutMs.toLong

Review Comment:
   We'd want to pass this in from the outside. Session timeouts can be 
overwritten for a specific group, so we need to use `groupConfigManager` which 
is present in `KafkaApis`. 
   
   ```
           Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
           return groupConfig.map(GroupConfig::streamsSessionTimeoutMs)
               
.orElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs());
   ```
   
   I would also re-fetch this value every time we create an error message, 
since the session timeout may change over time. So better pass this into 
`createStreamsInternalTopics`.
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to