dajac commented on code in PR #16898:
URL: https://github.com/apache/kafka/pull/16898#discussion_r1721507976


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -156,17 +164,45 @@ class CoordinatorLoaderImpl[T](
               } else {
                 batch.asScala.foreach { record =>
                   numRecords = numRecords + 1
-                  try {
-                    coordinator.replay(
-                      record.offset(),
-                      batch.producerId,
-                      batch.producerEpoch,
-                      deserializer.deserialize(record.key, record.value)
-                    )
-                  } catch {
-                    case ex: UnknownRecordTypeException =>
-                      warn(s"Unknown record type ${ex.unknownType} while 
loading offsets and group metadata " +
-                        s"from $tp. Ignoring it. It could be a left over from 
an aborted upgrade.")
+
+                  val coordinatorRecordOpt = {
+                    try {
+                      Some(deserializer.deserialize(record.key, record.value))
+                    } catch {
+                      case ex: UnknownRecordTypeException =>
+                        warn(s"Unknown record type ${ex.unknownType} while 
loading offsets and group metadata " +
+                          s"from $tp. Ignoring it. It could be a left over 
from an aborted upgrade.")
+                        None
+                      case ex: RuntimeException =>
+                        val msg = s"Deserializing record $record failed due 
to: ${ex.getMessage}."
+                        error(msg)
+                        throw new RuntimeException(msg, ex)
+                    }
+                  }
+
+                  coordinatorRecordOpt.foreach { coordinatorRecord =>
+                    try {
+                      if (isTraceEnabled) {
+                        trace(s"Replaying record $coordinatorRecord at offset 
${record.offset()} " +
+                          s"with producer id ${batch.producerId} and producer 
epoch ${batch.producerEpoch}.")
+                      }
+                      coordinator.replay(
+                        record.offset(),
+                        batch.producerId,
+                        batch.producerEpoch,
+                        coordinatorRecord
+                      )
+                    } catch {
+                      case ex: UnknownRecordTypeException =>
+                        warn(s"Unknown record type ${ex.unknownType} while 
loading offsets and group metadata " +
+                          s"from $tp. Ignoring it. It could be a left over 
from an aborted upgrade.")

Review Comment:
   Nope. It cannot. Let me remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to